Copying from glance

This commit is contained in:
Flavio Percoco 2014-01-24 18:30:46 +01:00
commit 6afd6df9d4
18 changed files with 4642 additions and 0 deletions

26
.gitignore vendored Normal file
View File

@ -0,0 +1,26 @@
*.pyc
*.swp
*.log
.glance-venv
.venv
.tox
.coverage
cover/*
nosetests.xml
coverage.xml
glance.sqlite
AUTHORS
ChangeLog
build
doc/source/api
dist
*.egg
glance.egg-info
tests.sqlite
glance/versioninfo
# Files created by doc build
doc/source/api
# IDE files
.project
.pydevproject

176
LICENSE Normal file
View File

@ -0,0 +1,176 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.

4
README.rst Normal file
View File

@ -0,0 +1,4 @@
Glance Store Library
=====================
Glance's stores library

0
glance/__init__.py Normal file
View File

715
glance/store/__init__.py Normal file
View File

@ -0,0 +1,715 @@
# Copyright 2010-2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import sys
from oslo.config import cfg
from glance.common import exception
from glance.common import utils
import glance.context
import glance.domain.proxy
from glance.openstack.common import importutils
import glance.openstack.common.log as logging
from glance.store import location
from glance.store import scrubber
LOG = logging.getLogger(__name__)
store_opts = [
cfg.ListOpt('known_stores',
default=[
'glance.store.filesystem.Store',
'glance.store.http.Store',
'glance.store.rbd.Store',
'glance.store.s3.Store',
'glance.store.swift.Store',
'glance.store.sheepdog.Store',
'glance.store.cinder.Store',
],
help=_('List of which store classes and store class locations '
'are currently known to glance at startup.')),
cfg.StrOpt('default_store', default='file',
help=_("Default scheme to use to store image data. The "
"scheme must be registered by one of the stores "
"defined by the 'known_stores' config option.")),
cfg.StrOpt('scrubber_datadir',
default='/var/lib/glance/scrubber',
help=_('Directory that the scrubber will use to track '
'information about what to delete. '
'Make sure this is set in glance-api.conf and '
'glance-scrubber.conf')),
cfg.BoolOpt('delayed_delete', default=False,
help=_('Turn on/off delayed delete.')),
cfg.IntOpt('scrub_time', default=0,
help=_('The amount of time in seconds to delay before '
'performing a delete.')),
]
CONF = cfg.CONF
CONF.register_opts(store_opts)
class BackendException(Exception):
pass
class UnsupportedBackend(BackendException):
pass
class Indexable(object):
"""
Wrapper that allows an iterator or filelike be treated as an indexable
data structure. This is required in the case where the return value from
Store.get() is passed to Store.add() when adding a Copy-From image to a
Store where the client library relies on eventlet GreenSockets, in which
case the data to be written is indexed over.
"""
def __init__(self, wrapped, size):
"""
Initialize the object
:param wrappped: the wrapped iterator or filelike.
:param size: the size of data available
"""
self.wrapped = wrapped
self.size = int(size) if size else (wrapped.len
if hasattr(wrapped, 'len') else 0)
self.cursor = 0
self.chunk = None
def __iter__(self):
"""
Delegate iteration to the wrapped instance.
"""
for self.chunk in self.wrapped:
yield self.chunk
def __getitem__(self, i):
"""
Index into the next chunk (or previous chunk in the case where
the last data returned was not fully consumed).
:param i: a slice-to-the-end
"""
start = i.start if isinstance(i, slice) else i
if start < self.cursor:
return self.chunk[(start - self.cursor):]
self.chunk = self.another()
if self.chunk:
self.cursor += len(self.chunk)
return self.chunk
def another(self):
"""Implemented by subclasses to return the next element"""
raise NotImplementedError
def getvalue(self):
"""
Return entire string value... used in testing
"""
return self.wrapped.getvalue()
def __len__(self):
"""
Length accessor.
"""
return self.size
def _get_store_class(store_entry):
store_cls = None
try:
LOG.debug("Attempting to import store %s", store_entry)
store_cls = importutils.import_class(store_entry)
except exception.NotFound:
raise BackendException('Unable to load store. '
'Could not find a class named %s.'
% store_entry)
return store_cls
def create_stores():
"""
Registers all store modules and all schemes
from the given config. Duplicates are not re-registered.
"""
store_count = 0
store_classes = set()
for store_entry in CONF.known_stores:
store_entry = store_entry.strip()
if not store_entry:
continue
store_cls = _get_store_class(store_entry)
try:
store_instance = store_cls()
except exception.BadStoreConfiguration as e:
LOG.warn(_("%s Skipping store driver.") % unicode(e))
continue
schemes = store_instance.get_schemes()
if not schemes:
raise BackendException('Unable to register store %s. '
'No schemes associated with it.'
% store_cls)
else:
if store_cls not in store_classes:
LOG.debug("Registering store %s with schemes %s",
store_cls, schemes)
store_classes.add(store_cls)
scheme_map = {}
for scheme in schemes:
loc_cls = store_instance.get_store_location_class()
scheme_map[scheme] = {
'store_class': store_cls,
'location_class': loc_cls,
}
location.register_scheme_map(scheme_map)
store_count += 1
else:
LOG.debug("Store %s already registered", store_cls)
return store_count
def verify_default_store():
scheme = cfg.CONF.default_store
context = glance.context.RequestContext()
try:
get_store_from_scheme(context, scheme)
except exception.UnknownScheme:
msg = _("Store for scheme %s not found") % scheme
raise RuntimeError(msg)
def get_known_schemes():
"""Returns list of known schemes"""
return location.SCHEME_TO_CLS_MAP.keys()
def get_store_from_scheme(context, scheme, loc=None):
"""
Given a scheme, return the appropriate store object
for handling that scheme.
"""
if scheme not in location.SCHEME_TO_CLS_MAP:
raise exception.UnknownScheme(scheme=scheme)
scheme_info = location.SCHEME_TO_CLS_MAP[scheme]
store = scheme_info['store_class'](context, loc)
return store
def get_store_from_uri(context, uri, loc=None):
"""
Given a URI, return the store object that would handle
operations on the URI.
:param uri: URI to analyze
"""
scheme = uri[0:uri.find('/') - 1]
store = get_store_from_scheme(context, scheme, loc)
return store
def get_from_backend(context, uri, **kwargs):
"""Yields chunks of data from backend specified by uri"""
loc = location.get_location_from_uri(uri)
store = get_store_from_uri(context, uri, loc)
try:
return store.get(loc)
except NotImplementedError:
raise exception.StoreGetNotSupported
def get_size_from_backend(context, uri):
"""Retrieves image size from backend specified by uri"""
loc = location.get_location_from_uri(uri)
store = get_store_from_uri(context, uri, loc)
return store.get_size(loc)
def delete_from_backend(context, uri, **kwargs):
"""Removes chunks of data from backend specified by uri"""
loc = location.get_location_from_uri(uri)
store = get_store_from_uri(context, uri, loc)
try:
return store.delete(loc)
except NotImplementedError:
raise exception.StoreDeleteNotSupported
def get_store_from_location(uri):
"""
Given a location (assumed to be a URL), attempt to determine
the store from the location. We use here a simple guess that
the scheme of the parsed URL is the store...
:param uri: Location to check for the store
"""
loc = location.get_location_from_uri(uri)
return loc.store_name
def safe_delete_from_backend(context, uri, image_id, **kwargs):
"""Given a uri, delete an image from the store."""
try:
return delete_from_backend(context, uri, **kwargs)
except exception.NotFound:
msg = _('Failed to delete image %s in store from URI')
LOG.warn(msg % image_id)
except exception.StoreDeleteNotSupported as e:
LOG.warn(str(e))
except UnsupportedBackend:
exc_type = sys.exc_info()[0].__name__
msg = (_('Failed to delete image %s from store (%s)') %
(image_id, exc_type))
LOG.error(msg)
def schedule_delayed_delete_from_backend(context, uri, image_id, **kwargs):
"""Given a uri, schedule the deletion of an image location."""
(file_queue, _db_queue) = scrubber.get_scrub_queues()
# NOTE(zhiyan): Defautly ask glance-api store using file based queue.
# In future we can change it using DB based queued instead,
# such as using image location's status to saving pending delete flag
# when that property be added.
file_queue.add_location(image_id, uri)
def delete_image_from_backend(context, store_api, image_id, uri):
if CONF.delayed_delete:
store_api.schedule_delayed_delete_from_backend(context, uri, image_id)
else:
store_api.safe_delete_from_backend(context, uri, image_id)
def check_location_metadata(val, key=''):
if isinstance(val, dict):
for key in val:
check_location_metadata(val[key], key=key)
elif isinstance(val, list):
ndx = 0
for v in val:
check_location_metadata(v, key='%s[%d]' % (key, ndx))
ndx = ndx + 1
elif not isinstance(val, unicode):
raise BackendException(_("The image metadata key %s has an invalid "
"type of %s. Only dict, list, and unicode "
"are supported.") % (key, type(val)))
def store_add_to_backend(image_id, data, size, store):
"""
A wrapper around a call to each stores add() method. This gives glance
a common place to check the output
:param image_id: The image add to which data is added
:param data: The data to be stored
:param size: The length of the data in bytes
:param store: The store to which the data is being added
:return: The url location of the file,
the size amount of data,
the checksum of the data
the storage systems metadata dictionary for the location
"""
(location, size, checksum, metadata) = store.add(image_id, data, size)
if metadata is not None:
if not isinstance(metadata, dict):
msg = (_("The storage driver %s returned invalid metadata %s"
"This must be a dictionary type") %
(str(store), str(metadata)))
LOG.error(msg)
raise BackendException(msg)
try:
check_location_metadata(metadata)
except BackendException as e:
e_msg = (_("A bad metadata structure was returned from the "
"%s storage driver: %s. %s.") %
(str(store), str(metadata), str(e)))
LOG.error(e_msg)
raise BackendException(e_msg)
return (location, size, checksum, metadata)
def add_to_backend(context, scheme, image_id, data, size):
store = get_store_from_scheme(context, scheme)
try:
return store_add_to_backend(image_id, data, size, store)
except NotImplementedError:
raise exception.StoreAddNotSupported
def set_acls(context, location_uri, public=False, read_tenants=[],
write_tenants=[]):
loc = location.get_location_from_uri(location_uri)
scheme = get_store_from_location(location_uri)
store = get_store_from_scheme(context, scheme, loc)
try:
store.set_acls(loc, public=public, read_tenants=read_tenants,
write_tenants=write_tenants)
except NotImplementedError:
LOG.debug(_("Skipping store.set_acls... not implemented."))
class ImageRepoProxy(glance.domain.proxy.Repo):
def __init__(self, image_repo, context, store_api):
self.context = context
self.store_api = store_api
proxy_kwargs = {'context': context, 'store_api': store_api}
super(ImageRepoProxy, self).__init__(image_repo,
item_proxy_class=ImageProxy,
item_proxy_kwargs=proxy_kwargs)
def _set_acls(self, image):
public = image.visibility == 'public'
member_ids = []
if image.locations and not public:
member_repo = image.get_member_repo()
member_ids = [m.member_id for m in member_repo.list()]
for location in image.locations:
self.store_api.set_acls(self.context, location['url'], public,
read_tenants=member_ids)
def add(self, image):
result = super(ImageRepoProxy, self).add(image)
self._set_acls(image)
return result
def save(self, image):
result = super(ImageRepoProxy, self).save(image)
self._set_acls(image)
return result
def _check_location_uri(context, store_api, uri):
"""
Check if an image location uri is valid.
:param context: Glance request context
:param store_api: store API module
:param uri: location's uri string
"""
is_ok = True
try:
size = store_api.get_size_from_backend(context, uri)
# NOTE(zhiyan): Some stores return zero when it catch exception
is_ok = size > 0
except (exception.UnknownScheme, exception.NotFound):
is_ok = False
if not is_ok:
raise exception.BadStoreUri(_('Invalid location: %s') % uri)
def _check_image_location(context, store_api, location):
_check_location_uri(context, store_api, location['url'])
store_api.check_location_metadata(location['metadata'])
def _set_image_size(context, image, locations):
if not image.size:
for location in locations:
size_from_backend = glance.store.get_size_from_backend(
context, location['url'])
if size_from_backend:
# NOTE(flwang): This assumes all locations have the same size
image.size = size_from_backend
break
class ImageFactoryProxy(glance.domain.proxy.ImageFactory):
def __init__(self, factory, context, store_api):
self.context = context
self.store_api = store_api
proxy_kwargs = {'context': context, 'store_api': store_api}
super(ImageFactoryProxy, self).__init__(factory,
proxy_class=ImageProxy,
proxy_kwargs=proxy_kwargs)
def new_image(self, **kwargs):
locations = kwargs.get('locations', [])
for l in locations:
_check_image_location(self.context, self.store_api, l)
if locations.count(l) > 1:
raise exception.DuplicateLocation(location=l['url'])
return super(ImageFactoryProxy, self).new_image(**kwargs)
class StoreLocations(collections.MutableSequence):
"""
The proxy for store location property. It takes responsibility for:
1. Location uri correctness checking when adding a new location.
2. Remove the image data from the store when a location is removed
from an image.
"""
def __init__(self, image_proxy, value):
self.image_proxy = image_proxy
if isinstance(value, list):
self.value = value
else:
self.value = list(value)
def append(self, location):
# NOTE(flaper87): Insert this
# location at the very end of
# the value list.
self.insert(len(self.value), location)
def extend(self, other):
if isinstance(other, StoreLocations):
locations = other.value
else:
locations = list(other)
for location in locations:
self.append(location)
def insert(self, i, location):
_check_image_location(self.image_proxy.context,
self.image_proxy.store_api, location)
if location in self.value:
raise exception.DuplicateLocation(location=location['url'])
self.value.insert(i, location)
_set_image_size(self.image_proxy.context,
self.image_proxy,
[location])
def pop(self, i=-1):
location = self.value.pop(i)
try:
delete_image_from_backend(self.image_proxy.context,
self.image_proxy.store_api,
self.image_proxy.image.image_id,
location['url'])
except Exception:
self.value.insert(i, location)
raise
return location
def count(self, location):
return self.value.count(location)
def index(self, location, *args):
return self.value.index(location, *args)
def remove(self, location):
if self.count(location):
self.pop(self.index(location))
else:
self.value.remove(location)
def reverse(self):
self.value.reverse()
# Mutable sequence, so not hashable
__hash__ = None
def __getitem__(self, i):
return self.value.__getitem__(i)
def __setitem__(self, i, location):
_check_image_location(self.image_proxy.context,
self.image_proxy.store_api, location)
self.value.__setitem__(i, location)
_set_image_size(self.image_proxy.context,
self.image_proxy,
[location])
def __delitem__(self, i):
location = None
try:
location = self.value.__getitem__(i)
except Exception:
return self.value.__delitem__(i)
delete_image_from_backend(self.image_proxy.context,
self.image_proxy.store_api,
self.image_proxy.image.image_id,
location['url'])
self.value.__delitem__(i)
def __delslice__(self, i, j):
i = max(i, 0)
j = max(j, 0)
locations = []
try:
locations = self.value.__getslice__(i, j)
except Exception:
return self.value.__delslice__(i, j)
for location in locations:
delete_image_from_backend(self.image_proxy.context,
self.image_proxy.store_api,
self.image_proxy.image.image_id,
location['url'])
self.value.__delitem__(i)
def __iadd__(self, other):
self.extend(other)
return self
def __contains__(self, location):
return location in self.value
def __len__(self):
return len(self.value)
def __cast(self, other):
if isinstance(other, StoreLocations):
return other.value
else:
return other
def __cmp__(self, other):
return cmp(self.value, self.__cast(other))
def __iter__(self):
return iter(self.value)
def _locations_proxy(target, attr):
"""
Make a location property proxy on the image object.
:param target: the image object on which to add the proxy
:param attr: the property proxy we want to hook
"""
def get_attr(self):
value = getattr(getattr(self, target), attr)
return StoreLocations(self, value)
def set_attr(self, value):
if not isinstance(value, (list, StoreLocations)):
raise exception.BadStoreUri(_('Invalid locations: %s') % value)
ori_value = getattr(getattr(self, target), attr)
if ori_value != value:
# NOTE(zhiyan): Enforced locations list was previously empty list.
if len(ori_value) > 0:
raise exception.Invalid(_('Original locations is not empty: '
'%s') % ori_value)
# NOTE(zhiyan): Check locations are all valid.
for location in value:
_check_image_location(self.context, self.store_api,
location)
if value.count(location) > 1:
raise exception.DuplicateLocation(location=location['url'])
_set_image_size(self.context, getattr(self, target), value)
return setattr(getattr(self, target), attr, list(value))
def del_attr(self):
value = getattr(getattr(self, target), attr)
while len(value):
delete_image_from_backend(self.context, self.store_api,
self.image.image_id, value[0]['url'])
del value[0]
setattr(getattr(self, target), attr, value)
return delattr(getattr(self, target), attr)
return property(get_attr, set_attr, del_attr)
class ImageProxy(glance.domain.proxy.Image):
locations = _locations_proxy('image', 'locations')
def __init__(self, image, context, store_api):
self.image = image
self.context = context
self.store_api = store_api
proxy_kwargs = {
'context': context,
'image': self,
'store_api': store_api,
}
super(ImageProxy, self).__init__(
image, member_repo_proxy_class=ImageMemberRepoProxy,
member_repo_proxy_kwargs=proxy_kwargs)
def delete(self):
self.image.delete()
if self.image.locations:
for location in self.image.locations:
self.store_api.delete_image_from_backend(self.context,
self.store_api,
self.image.image_id,
location['url'])
def set_data(self, data, size=None):
if size is None:
size = 0 # NOTE(markwash): zero -> unknown size
location, size, checksum, loc_meta = self.store_api.add_to_backend(
self.context, CONF.default_store,
self.image.image_id, utils.CooperativeReader(data), size)
self.image.locations = [{'url': location, 'metadata': loc_meta}]
self.image.size = size
self.image.checksum = checksum
self.image.status = 'active'
def get_data(self):
if not self.image.locations:
raise exception.NotFound(_("No image data could be found"))
err = None
for loc in self.image.locations:
try:
data, size = self.store_api.get_from_backend(self.context,
loc['url'])
return data
except Exception as e:
LOG.warn(_('Get image %(id)s data from %(loc)s '
'failed: %(err)s.') % {'id': self.image.image_id,
'loc': loc, 'err': e})
err = e
# tried all locations
LOG.error(_('Glance tried all locations to get data for image %s '
'but all have failed.') % self.image.image_id)
raise err
class ImageMemberRepoProxy(glance.domain.proxy.Repo):
def __init__(self, repo, image, context, store_api):
self.repo = repo
self.image = image
self.context = context
self.store_api = store_api
super(ImageMemberRepoProxy, self).__init__(repo)
def _set_acls(self):
public = self.image.visibility == 'public'
if self.image.locations and not public:
member_ids = [m.member_id for m in self.repo.list()]
for location in self.image.locations:
self.store_api.set_acls(self.context, location['url'], public,
read_tenants=member_ids)
def add(self, member):
super(ImageMemberRepoProxy, self).add(member)
self._set_acls()
def remove(self, member):
super(ImageMemberRepoProxy, self).remove(member)
self._set_acls()

167
glance/store/base.py Normal file
View File

@ -0,0 +1,167 @@
# Copyright 2011 OpenStack Foundation
# Copyright 2012 RedHat Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Base class for all storage backends"""
from glance.common import exception
from glance.openstack.common import importutils
import glance.openstack.common.log as logging
from glance.openstack.common import strutils
from glance.openstack.common import units
LOG = logging.getLogger(__name__)
def _exception_to_unicode(exc):
try:
return unicode(exc)
except UnicodeError:
try:
return strutils.safe_decode(str(exc), errors='ignore')
except UnicodeError:
msg = (_("Caught '%(exception)s' exception.") %
{"exception": exc.__class__.__name__})
return strutils.safe_decode(msg, errors='ignore')
class Store(object):
CHUNKSIZE = 16 * units.Mi # 16M
def __init__(self, context=None, location=None):
"""
Initialize the Store
"""
self.store_location_class = None
self.context = context
self.configure()
try:
self.configure_add()
except exception.BadStoreConfiguration as e:
self.add = self.add_disabled
msg = (_(u"Failed to configure store correctly: %s "
"Disabling add method.") % _exception_to_unicode(e))
LOG.warn(msg)
def configure(self):
"""
Configure the Store to use the stored configuration options
Any store that needs special configuration should implement
this method.
"""
pass
def get_schemes(self):
"""
Returns a tuple of schemes which this store can handle.
"""
raise NotImplementedError
def get_store_location_class(self):
"""
Returns the store location class that is used by this store.
"""
if not self.store_location_class:
class_name = "%s.StoreLocation" % (self.__module__)
LOG.debug("Late loading location class %s", class_name)
self.store_location_class = importutils.import_class(class_name)
return self.store_location_class
def configure_add(self):
"""
This is like `configure` except that it's specifically for
configuring the store to accept objects.
If the store was not able to successfully configure
itself, it should raise `exception.BadStoreConfiguration`.
"""
pass
def get(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file, and returns a tuple of generator
(for reading the image file) and image_size
:param location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:raises `glance.exception.NotFound` if image does not exist
"""
raise NotImplementedError
def get_size(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file, and returns the size
:param location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:raises `glance.exception.NotFound` if image does not exist
"""
raise NotImplementedError
def add_disabled(self, *args, **kwargs):
"""
Add method that raises an exception because the Store was
not able to be configured properly and therefore the add()
method would error out.
"""
raise exception.StoreAddDisabled
def add(self, image_id, image_file, image_size):
"""
Stores an image file with supplied identifier to the backend
storage system and returns a tuple containing information
about the stored image.
:param image_id: The opaque image identifier
:param image_file: The image data to write, as a file-like object
:param image_size: The size of the image data to write, in bytes
:retval tuple of URL in backing store, bytes written, checksum
and a dictionary with storage system specific information
:raises `glance.common.exception.Duplicate` if the image already
existed
"""
raise NotImplementedError
def delete(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file to delete
:location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:raises `glance.exception.NotFound` if image does not exist
"""
raise NotImplementedError
def set_acls(self, location, public=False, read_tenants=[],
write_tenants=[]):
"""
Sets the read and write access control list for an image in the
backend store.
:location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:public A boolean indicating whether the image should be public.
:read_tenants A list of tenant strings which should be granted
read access for an image.
:write_tenants A list of tenant strings which should be granted
write access for an image.
"""
raise NotImplementedError

182
glance/store/cinder.py Normal file
View File

@ -0,0 +1,182 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Storage backend for Cinder"""
from cinderclient import exceptions as cinder_exception
from cinderclient import service_catalog
from cinderclient.v2 import client as cinderclient
from oslo.config import cfg
from glance.common import exception
from glance.common import utils
import glance.openstack.common.log as logging
from glance.openstack.common import units
import glance.store.base
import glance.store.location
LOG = logging.getLogger(__name__)
cinder_opts = [
cfg.StrOpt('cinder_catalog_info',
default='volume:cinder:publicURL',
help='Info to match when looking for cinder in the service '
'catalog. Format is : separated values of the form: '
'<service_type>:<service_name>:<endpoint_type>'),
cfg.StrOpt('cinder_endpoint_template',
default=None,
help='Override service catalog lookup with template for cinder '
'endpoint e.g. http://localhost:8776/v1/%(project_id)s'),
cfg.StrOpt('os_region_name',
default=None,
help='Region name of this node'),
cfg.StrOpt('cinder_ca_certificates_file',
default=None,
help='Location of ca certicates file to use for cinder client '
'requests.'),
cfg.IntOpt('cinder_http_retries',
default=3,
help='Number of cinderclient retries on failed http calls'),
cfg.BoolOpt('cinder_api_insecure',
default=False,
help='Allow to perform insecure SSL requests to cinder'),
]
CONF = cfg.CONF
CONF.register_opts(cinder_opts)
def get_cinderclient(context):
if CONF.cinder_endpoint_template:
url = CONF.cinder_endpoint_template % context.to_dict()
else:
info = CONF.cinder_catalog_info
service_type, service_name, endpoint_type = info.split(':')
# extract the region if set in configuration
if CONF.os_region_name:
attr = 'region'
filter_value = CONF.os_region_name
else:
attr = None
filter_value = None
# FIXME: the cinderclient ServiceCatalog object is mis-named.
# It actually contains the entire access blob.
# Only needed parts of the service catalog are passed in, see
# nova/context.py.
compat_catalog = {
'access': {'serviceCatalog': context.service_catalog or []}}
sc = service_catalog.ServiceCatalog(compat_catalog)
url = sc.url_for(attr=attr,
filter_value=filter_value,
service_type=service_type,
service_name=service_name,
endpoint_type=endpoint_type)
LOG.debug(_('Cinderclient connection created using URL: %s') % url)
c = cinderclient.Client(context.user,
context.auth_tok,
project_id=context.tenant,
auth_url=url,
insecure=CONF.cinder_api_insecure,
retries=CONF.cinder_http_retries,
cacert=CONF.cinder_ca_certificates_file)
# noauth extracts user_id:project_id from auth_token
c.client.auth_token = context.auth_tok or '%s:%s' % (context.user,
context.tenant)
c.client.management_url = url
return c
class StoreLocation(glance.store.location.StoreLocation):
"""Class describing a Cinder URI"""
def process_specs(self):
self.scheme = self.specs.get('scheme', 'cinder')
self.volume_id = self.specs.get('volume_id')
def get_uri(self):
return "cinder://%s" % self.volume_id
def parse_uri(self, uri):
if not uri.startswith('cinder://'):
reason = _("URI must start with cinder://")
LOG.error(reason)
raise exception.BadStoreUri(uri, reason)
self.scheme = 'cinder'
self.volume_id = uri[9:]
if not utils.is_uuid_like(self.volume_id):
reason = _("URI contains invalid volume ID: %s") % self.volume_id
LOG.error(reason)
raise exception.BadStoreUri(uri, reason)
class Store(glance.store.base.Store):
"""Cinder backend store adapter."""
EXAMPLE_URL = "cinder://volume-id"
def get_schemes(self):
return ('cinder',)
def configure_add(self):
"""
Configure the Store to use the stored configuration options
Any store that needs special configuration should implement
this method. If the store was not able to successfully configure
itself, it should raise `exception.BadStoreConfiguration`
"""
if self.context is None:
reason = _("Cinder storage requires a context.")
raise exception.BadStoreConfiguration(store_name="cinder",
reason=reason)
if self.context.service_catalog is None:
reason = _("Cinder storage requires a service catalog.")
raise exception.BadStoreConfiguration(store_name="cinder",
reason=reason)
def get_size(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file and returns the image size
:param location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:raises `glance.exception.NotFound` if image does not exist
:rtype int
"""
loc = location.store_location
try:
volume = get_cinderclient(self.context).volumes.get(loc.volume_id)
# GB unit convert to byte
return volume.size * units.Gi
except cinder_exception.NotFound as e:
reason = _("Failed to get image size due to "
"volume can not be found: %s") % self.volume_id
LOG.error(reason)
raise exception.NotFound(reason)
except Exception as e:
LOG.exception(_("Failed to get image size due to "
"internal error: %s") % e)
return 0

301
glance/store/filesystem.py Normal file
View File

@ -0,0 +1,301 @@
# Copyright 2010 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A simple filesystem-backed store
"""
import errno
import hashlib
import os
import urlparse
from oslo.config import cfg
from glance.common import exception
from glance.common import utils
from glance.openstack.common import jsonutils
import glance.openstack.common.log as logging
import glance.store
import glance.store.base
import glance.store.location
LOG = logging.getLogger(__name__)
filesystem_opts = [
cfg.StrOpt('filesystem_store_datadir',
help=_('Directory to which the Filesystem backend '
'store writes images.')),
cfg.StrOpt('filesystem_store_metadata_file',
help=_("The path to a file which contains the "
"metadata to be returned with any location "
"associated with this store. The file must "
"contain a valid JSON dict."))]
CONF = cfg.CONF
CONF.register_opts(filesystem_opts)
class StoreLocation(glance.store.location.StoreLocation):
"""Class describing a Filesystem URI"""
def process_specs(self):
self.scheme = self.specs.get('scheme', 'file')
self.path = self.specs.get('path')
def get_uri(self):
return "file://%s" % self.path
def parse_uri(self, uri):
"""
Parse URLs. This method fixes an issue where credentials specified
in the URL are interpreted differently in Python 2.6.1+ than prior
versions of Python.
"""
pieces = urlparse.urlparse(uri)
assert pieces.scheme in ('file', 'filesystem')
self.scheme = pieces.scheme
path = (pieces.netloc + pieces.path).strip()
if path == '':
reason = _("No path specified in URI: %s") % uri
LOG.debug(reason)
raise exception.BadStoreUri('No path specified')
self.path = path
class ChunkedFile(object):
"""
We send this back to the Glance API server as
something that can iterate over a large file
"""
CHUNKSIZE = 65536
def __init__(self, filepath):
self.filepath = filepath
self.fp = open(self.filepath, 'rb')
def __iter__(self):
"""Return an iterator over the image file"""
try:
if self.fp:
while True:
chunk = self.fp.read(ChunkedFile.CHUNKSIZE)
if chunk:
yield chunk
else:
break
finally:
self.close()
def close(self):
"""Close the internal file pointer"""
if self.fp:
self.fp.close()
self.fp = None
class Store(glance.store.base.Store):
def get_schemes(self):
return ('file', 'filesystem')
def configure_add(self):
"""
Configure the Store to use the stored configuration options
Any store that needs special configuration should implement
this method. If the store was not able to successfully configure
itself, it should raise `exception.BadStoreConfiguration`
"""
self.datadir = CONF.filesystem_store_datadir
if self.datadir is None:
reason = (_("Could not find %s in configuration options.") %
'filesystem_store_datadir')
LOG.error(reason)
raise exception.BadStoreConfiguration(store_name="filesystem",
reason=reason)
if not os.path.exists(self.datadir):
msg = _("Directory to write image files does not exist "
"(%s). Creating.") % self.datadir
LOG.info(msg)
try:
os.makedirs(self.datadir)
except (IOError, OSError):
if os.path.exists(self.datadir):
# NOTE(markwash): If the path now exists, some other
# process must have beat us in the race condition. But it
# doesn't hurt, so we can safely ignore the error.
return
reason = _("Unable to create datadir: %s") % self.datadir
LOG.error(reason)
raise exception.BadStoreConfiguration(store_name="filesystem",
reason=reason)
@staticmethod
def _resolve_location(location):
filepath = location.store_location.path
if not os.path.exists(filepath):
raise exception.NotFound(_("Image file %s not found") % filepath)
filesize = os.path.getsize(filepath)
return filepath, filesize
def _get_metadata(self):
if CONF.filesystem_store_metadata_file is None:
return {}
try:
with open(CONF.filesystem_store_metadata_file, 'r') as fptr:
metadata = jsonutils.load(fptr)
glance.store.check_location_metadata(metadata)
return metadata
except glance.store.BackendException as bee:
LOG.error(_('The JSON in the metadata file %s could not be used: '
'%s An empty dictionary will be returned '
'to the client.')
% (CONF.filesystem_store_metadata_file, str(bee)))
return {}
except IOError as ioe:
LOG.error(_('The path for the metadata file %s could not be '
'opened: %s An empty dictionary will be returned '
'to the client.')
% (CONF.filesystem_store_metadata_file, ioe))
return {}
except Exception as ex:
LOG.exception(_('An error occurred processing the storage systems '
'meta data file: %s. An empty dictionary will be '
'returned to the client.') % str(ex))
return {}
def get(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file, and returns a tuple of generator
(for reading the image file) and image_size
:param location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:raises `glance.exception.NotFound` if image does not exist
"""
filepath, filesize = self._resolve_location(location)
msg = _("Found image at %s. Returning in ChunkedFile.") % filepath
LOG.debug(msg)
return (ChunkedFile(filepath), filesize)
def get_size(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file and returns the image size
:param location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:raises `glance.exception.NotFound` if image does not exist
:rtype int
"""
filepath, filesize = self._resolve_location(location)
msg = _("Found image at %s.") % filepath
LOG.debug(msg)
return filesize
def delete(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file to delete
:location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:raises NotFound if image does not exist
:raises Forbidden if cannot delete because of permissions
"""
loc = location.store_location
fn = loc.path
if os.path.exists(fn):
try:
LOG.debug(_("Deleting image at %(fn)s"), {'fn': fn})
os.unlink(fn)
except OSError:
raise exception.Forbidden(_("You cannot delete file %s") % fn)
else:
raise exception.NotFound(_("Image file %s does not exist") % fn)
def add(self, image_id, image_file, image_size):
"""
Stores an image file with supplied identifier to the backend
storage system and returns a tuple containing information
about the stored image.
:param image_id: The opaque image identifier
:param image_file: The image data to write, as a file-like object
:param image_size: The size of the image data to write, in bytes
:retval tuple of URL in backing store, bytes written, checksum
and a dictionary with storage system specific information
:raises `glance.common.exception.Duplicate` if the image already
existed
:note By default, the backend writes the image data to a file
`/<DATADIR>/<ID>`, where <DATADIR> is the value of
the filesystem_store_datadir configuration option and <ID>
is the supplied image ID.
"""
filepath = os.path.join(self.datadir, str(image_id))
if os.path.exists(filepath):
raise exception.Duplicate(_("Image file %s already exists!")
% filepath)
checksum = hashlib.md5()
bytes_written = 0
try:
with open(filepath, 'wb') as f:
for buf in utils.chunkreadable(image_file,
ChunkedFile.CHUNKSIZE):
bytes_written += len(buf)
checksum.update(buf)
f.write(buf)
except IOError as e:
if e.errno != errno.EACCES:
self._delete_partial(filepath, image_id)
exceptions = {errno.EFBIG: exception.StorageFull(),
errno.ENOSPC: exception.StorageFull(),
errno.EACCES: exception.StorageWriteDenied()}
raise exceptions.get(e.errno, e)
except Exception:
self._delete_partial(filepath, image_id)
raise
checksum_hex = checksum.hexdigest()
metadata = self._get_metadata()
LOG.debug(_("Wrote %(bytes_written)d bytes to %(filepath)s with "
"checksum %(checksum_hex)s"),
{'bytes_written': bytes_written,
'filepath': filepath,
'checksum_hex': checksum_hex})
return ('file://%s' % filepath, bytes_written, checksum_hex, metadata)
@staticmethod
def _delete_partial(filepath, id):
try:
os.unlink(filepath)
except Exception as e:
msg = _('Unable to remove partial image data for image %s: %s')
LOG.error(msg % (id, e))

212
glance/store/gridfs.py Normal file
View File

@ -0,0 +1,212 @@
# Copyright 2013 Red Hat, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Storage backend for GridFS"""
from __future__ import absolute_import
from oslo.config import cfg
import urlparse
from glance.common import exception
from glance.openstack.common import excutils
import glance.openstack.common.log as logging
import glance.store.base
import glance.store.location
try:
import gridfs
import gridfs.errors
import pymongo
import pymongo.uri_parser as uri_parser
except ImportError:
pymongo = None
LOG = logging.getLogger(__name__)
gridfs_opts = [
cfg.StrOpt('mongodb_store_uri',
help="Hostname or IP address of the instance to connect to, "
"or a mongodb URI, or a list of hostnames / mongodb URIs. "
"If host is an IPv6 literal it must be enclosed "
"in '[' and ']' characters following the RFC2732 "
"URL syntax (e.g. '[::1]' for localhost)"),
cfg.StrOpt('mongodb_store_db', default=None, help='Database to use'),
]
CONF = cfg.CONF
CONF.register_opts(gridfs_opts)
class StoreLocation(glance.store.location.StoreLocation):
"""
Class describing an gridfs URI:
gridfs://<IMAGE_ID>
Connection information has been consciously omitted for
security reasons, since this location will be stored in glance's
database and can be queried from outside.
Note(flaper87): Make connection info available if user wants so
by adding a new configuration parameter `mongdb_store_insecure`.
"""
def get_uri(self):
return "gridfs://%s" % self.specs.get("image_id")
def parse_uri(self, uri):
"""
This method should fix any issue with the passed URI. Right now,
it just sets image_id value in the specs dict.
:param uri: Current set URI
"""
parsed = urlparse.urlparse(uri)
assert parsed.scheme in ('gridfs',)
self.specs["image_id"] = parsed.netloc
class Store(glance.store.base.Store):
"""GridFS adapter"""
EXAMPLE_URL = "gridfs://<IMAGE_ID>"
def get_schemes(self):
return ('gridfs',)
def configure_add(self):
"""
Configure the Store to use the stored configuration options
Any store that needs special configuration should implement
this method. If the store was not able to successfully configure
itself, it should raise `exception.BadStoreConfiguration`
"""
if pymongo is None:
msg = _("Missing dependencies: pymongo")
raise exception.BadStoreConfiguration(store_name="gridfs",
reason=msg)
self.mongodb_uri = self._option_get('mongodb_store_uri')
parsed = uri_parser.parse_uri(self.mongodb_uri)
self.mongodb_db = self._option_get('mongodb_store_db') or \
parsed.get("database")
self.mongodb = pymongo.MongoClient(self.mongodb_uri)
self.fs = gridfs.GridFS(self.mongodb[self.mongodb_db])
def _option_get(self, param):
result = getattr(CONF, param)
if not result:
reason = (_("Could not find %(param)s in configuration "
"options.") % {'param': param})
LOG.debug(reason)
raise exception.BadStoreConfiguration(store_name="gridfs",
reason=reason)
return result
def get(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file, and returns a tuple of generator
(for reading the image file) and image_size
:param location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:raises `glance.exception.NotFound` if image does not exist
"""
image = self._get_file(location)
return (image, image.length)
def get_size(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file, and returns the image_size (or 0
if unavailable)
:param location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
"""
try:
key = self._get_file(location)
return key.length
except Exception:
return 0
def _get_file(self, location):
store_location = location
if isinstance(location, glance.store.location.Location):
store_location = location.store_location
try:
parsed = urlparse.urlparse(store_location.get_uri())
return self.fs.get(parsed.netloc)
except gridfs.errors.NoFile:
msg = _("Could not find %s image in GridFS") % \
store_location.get_uri()
LOG.debug(msg)
raise exception.NotFound(msg)
def add(self, image_id, image_file, image_size):
"""
Stores an image file with supplied identifier to the backend
storage system and returns a tuple containing information
about the stored image.
:param image_id: The opaque image identifier
:param image_file: The image data to write, as a file-like object
:param image_size: The size of the image data to write, in bytes
:retval tuple of URL in backing store, bytes written, checksum
and a dictionary with storage system specific information
:raises `glance.common.exception.Duplicate` if the image already
existed
"""
loc = StoreLocation({'image_id': image_id})
if self.fs.exists(image_id):
raise exception.Duplicate(_("GridFS already has an image at "
"location %s") % loc.get_uri())
LOG.debug(_("Adding a new image to GridFS with id %s and size %s") %
(image_id, image_size))
try:
self.fs.put(image_file, _id=image_id)
image = self._get_file(loc)
except Exception:
# Note(zhiyan): clean up already received data when
# error occurs such as ImageSizeLimitExceeded exception.
with excutils.save_and_reraise_exception():
self.fs.delete(image_id)
LOG.debug(_("Uploaded image %s, md5 %s, length %s to GridFS") %
(image._id, image.md5, image.length))
return (loc.get_uri(), image.length, image.md5, {})
def delete(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file to delete
:location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:raises NotFound if image does not exist
"""
image = self._get_file(location)
self.fs.delete(image._id)
LOG.debug("Deleted image %s from GridFS")

192
glance/store/http.py Normal file
View File

@ -0,0 +1,192 @@
# Copyright 2010 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import httplib
import urlparse
from glance.common import exception
import glance.openstack.common.log as logging
import glance.store.base
import glance.store.location
LOG = logging.getLogger(__name__)
MAX_REDIRECTS = 5
class StoreLocation(glance.store.location.StoreLocation):
"""Class describing an HTTP(S) URI"""
def process_specs(self):
self.scheme = self.specs.get('scheme', 'http')
self.netloc = self.specs['netloc']
self.user = self.specs.get('user')
self.password = self.specs.get('password')
self.path = self.specs.get('path')
def _get_credstring(self):
if self.user:
return '%s:%s@' % (self.user, self.password)
return ''
def get_uri(self):
return "%s://%s%s%s" % (
self.scheme,
self._get_credstring(),
self.netloc,
self.path)
def parse_uri(self, uri):
"""
Parse URLs. This method fixes an issue where credentials specified
in the URL are interpreted differently in Python 2.6.1+ than prior
versions of Python.
"""
pieces = urlparse.urlparse(uri)
assert pieces.scheme in ('https', 'http')
self.scheme = pieces.scheme
netloc = pieces.netloc
path = pieces.path
try:
if '@' in netloc:
creds, netloc = netloc.split('@')
else:
creds = None
except ValueError:
# Python 2.6.1 compat
# see lp659445 and Python issue7904
if '@' in path:
creds, path = path.split('@')
else:
creds = None
if creds:
try:
self.user, self.password = creds.split(':')
except ValueError:
reason = (_("Credentials '%s' not well-formatted.")
% "".join(creds))
LOG.debug(reason)
raise exception.BadStoreUri()
else:
self.user = None
if netloc == '':
reason = _("No address specified in HTTP URL")
LOG.debug(reason)
raise exception.BadStoreUri(message=reason)
self.netloc = netloc
self.path = path
def http_response_iterator(conn, response, size):
"""
Return an iterator for a file-like object.
:param conn: HTTP(S) Connection
:param response: httplib.HTTPResponse object
:param size: Chunk size to iterate with
"""
chunk = response.read(size)
while chunk:
yield chunk
chunk = response.read(size)
conn.close()
class Store(glance.store.base.Store):
"""An implementation of the HTTP(S) Backend Adapter"""
def get(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file, and returns a tuple of generator
(for reading the image file) and image_size
:param location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
"""
conn, resp, content_length = self._query(location, 'GET')
iterator = http_response_iterator(conn, resp, self.CHUNKSIZE)
class ResponseIndexable(glance.store.Indexable):
def another(self):
try:
return self.wrapped.next()
except StopIteration:
return ''
return (ResponseIndexable(iterator, content_length), content_length)
def get_schemes(self):
return ('http', 'https')
def get_size(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file, and returns the size
:param location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
"""
try:
return self._query(location, 'HEAD')[2]
except Exception:
return 0
def _query(self, location, verb, depth=0):
if depth > MAX_REDIRECTS:
reason = (_("The HTTP URL exceeded %s maximum "
"redirects.") % MAX_REDIRECTS)
LOG.debug(reason)
raise exception.MaxRedirectsExceeded(redirects=MAX_REDIRECTS)
loc = location.store_location
conn_class = self._get_conn_class(loc)
conn = conn_class(loc.netloc)
conn.request(verb, loc.path, "", {})
resp = conn.getresponse()
# Check for bad status codes
if resp.status >= 400:
reason = _("HTTP URL returned a %s status code.") % resp.status
LOG.debug(reason)
raise exception.BadStoreUri(loc.path, reason)
location_header = resp.getheader("location")
if location_header:
if resp.status not in (301, 302):
reason = (_("The HTTP URL attempted to redirect with an "
"invalid %s status code.") % resp.status)
LOG.debug(reason)
raise exception.BadStoreUri(loc.path, reason)
location_class = glance.store.location.Location
new_loc = location_class(location.store_name,
location.store_location.__class__,
uri=location_header,
image_id=location.image_id,
store_specs=location.store_specs)
return self._query(new_loc, verb, depth + 1)
content_length = int(resp.getheader('content-length', 0))
return (conn, resp, content_length)
def _get_conn_class(self, loc):
"""
Returns connection class for accessing the resource. Useful
for dependency injection and stubouts in testing...
"""
return {'http': httplib.HTTPConnection,
'https': httplib.HTTPSConnection}[loc.scheme]

163
glance/store/location.py Normal file
View File

@ -0,0 +1,163 @@
# Copyright 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A class that describes the location of an image in Glance.
In Glance, an image can either be **stored** in Glance, or it can be
**registered** in Glance but actually be stored somewhere else.
We needed a class that could support the various ways that Glance
describes where exactly an image is stored.
An image in Glance has two location properties: the image URI
and the image storage URI.
The image URI is essentially the permalink identifier for the image.
It is displayed in the output of various Glance API calls and,
while read-only, is entirely user-facing. It shall **not** contain any
security credential information at all. The Glance image URI shall
be the host:port of that Glance API server along with /images/<IMAGE_ID>.
The Glance storage URI is an internal URI structure that Glance
uses to maintain critical information about how to access the images
that it stores in its storage backends. It **may contain** security
credentials and is **not** user-facing.
"""
import urlparse
from glance.common import exception
import glance.openstack.common.log as logging
LOG = logging.getLogger(__name__)
SCHEME_TO_CLS_MAP = {}
def get_location_from_uri(uri):
"""
Given a URI, return a Location object that has had an appropriate
store parse the URI.
:param uri: A URI that could come from the end-user in the Location
attribute/header
Example URIs:
https://user:pass@example.com:80/images/some-id
http://images.oracle.com/123456
swift://example.com/container/obj-id
swift://user:account:pass@authurl.com/container/obj-id
swift+http://user:account:pass@authurl.com/container/obj-id
s3://accesskey:secretkey@s3.amazonaws.com/bucket/key-id
s3+https://accesskey:secretkey@s3.amazonaws.com/bucket/key-id
file:///var/lib/glance/images/1
cinder://volume-id
"""
pieces = urlparse.urlparse(uri)
if pieces.scheme not in SCHEME_TO_CLS_MAP.keys():
raise exception.UnknownScheme(scheme=pieces.scheme)
scheme_info = SCHEME_TO_CLS_MAP[pieces.scheme]
return Location(pieces.scheme, uri=uri,
store_location_class=scheme_info['location_class'])
def register_scheme_map(scheme_map):
"""
Given a mapping of 'scheme' to store_name, adds the mapping to the
known list of schemes if it does not already exist.
"""
for (k, v) in scheme_map.items():
if k not in SCHEME_TO_CLS_MAP:
LOG.debug("Registering scheme %s with %s", k, v)
SCHEME_TO_CLS_MAP[k] = v
class Location(object):
"""
Class describing the location of an image that Glance knows about
"""
def __init__(self, store_name, store_location_class,
uri=None, image_id=None, store_specs=None):
"""
Create a new Location object.
:param store_name: The string identifier/scheme of the storage backend
:param store_location_class: The store location class to use
for this location instance.
:param image_id: The identifier of the image in whatever storage
backend is used.
:param uri: Optional URI to construct location from
:param store_specs: Dictionary of information about the location
of the image that is dependent on the backend
store
"""
self.store_name = store_name
self.image_id = image_id
self.store_specs = store_specs or {}
self.store_location = store_location_class(self.store_specs)
if uri:
self.store_location.parse_uri(uri)
def get_store_uri(self):
"""
Returns the Glance image URI, which is the host:port of the API server
along with /images/<IMAGE_ID>
"""
return self.store_location.get_uri()
def get_uri(self):
return None
class StoreLocation(object):
"""
Base class that must be implemented by each store
"""
def __init__(self, store_specs):
self.specs = store_specs
if self.specs:
self.process_specs()
def process_specs(self):
"""
Subclasses should implement any processing of the self.specs collection
such as storing credentials and possibly establishing connections.
"""
pass
def get_uri(self):
"""
Subclasses should implement a method that returns an internal URI that,
when supplied to the StoreLocation instance, can be interpreted by the
StoreLocation's parse_uri() method. The URI returned from this method
shall never be public and only used internally within Glance, so it is
fine to encode credentials in this URI.
"""
raise NotImplementedError("StoreLocation subclass must implement "
"get_uri()")
def parse_uri(self, uri):
"""
Subclasses should implement a method that accepts a string URI and
sets appropriate internal fields such that a call to get_uri() will
return a proper internal URI
"""
raise NotImplementedError("StoreLocation subclass must implement "
"parse_uri()")

390
glance/store/rbd.py Normal file
View File

@ -0,0 +1,390 @@
# Copyright 2010-2011 Josh Durgin
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Storage backend for RBD
(RADOS (Reliable Autonomic Distributed Object Store) Block Device)"""
from __future__ import absolute_import
from __future__ import with_statement
import hashlib
import math
import urllib
from oslo.config import cfg
from glance.common import exception
from glance.common import utils
import glance.openstack.common.log as logging
from glance.openstack.common import units
import glance.store.base
import glance.store.location
try:
import rados
import rbd
except ImportError:
rados = None
rbd = None
DEFAULT_POOL = 'images'
DEFAULT_CONFFILE = '/etc/ceph/ceph.conf'
DEFAULT_USER = None # let librados decide based on the Ceph conf file
DEFAULT_CHUNKSIZE = 8 # in MiB
DEFAULT_SNAPNAME = 'snap'
LOG = logging.getLogger(__name__)
rbd_opts = [
cfg.IntOpt('rbd_store_chunk_size', default=DEFAULT_CHUNKSIZE,
help=_('RADOS images will be chunked into objects of this size '
'(in megabytes). For best performance, this should be '
'a power of two.')),
cfg.StrOpt('rbd_store_pool', default=DEFAULT_POOL,
help=_('RADOS pool in which images are stored.')),
cfg.StrOpt('rbd_store_user', default=DEFAULT_USER,
help=_('RADOS user to authenticate as (only applicable if '
'using Cephx. If <None>, a default will be chosen based '
'on the client. section in rbd_store_ceph_conf)')),
cfg.StrOpt('rbd_store_ceph_conf', default=DEFAULT_CONFFILE,
help=_('Ceph configuration file path. '
'If <None>, librados will locate the default config. '
'If using cephx authentication, this file should '
'include a reference to the right keyring '
'in a client.<USER> section')),
]
CONF = cfg.CONF
CONF.register_opts(rbd_opts)
class StoreLocation(glance.store.location.StoreLocation):
"""
Class describing a RBD URI. This is of the form:
rbd://image
or
rbd://fsid/pool/image/snapshot
"""
def process_specs(self):
# convert to ascii since librbd doesn't handle unicode
for key, value in self.specs.iteritems():
self.specs[key] = str(value)
self.fsid = self.specs.get('fsid')
self.pool = self.specs.get('pool')
self.image = self.specs.get('image')
self.snapshot = self.specs.get('snapshot')
def get_uri(self):
if self.fsid and self.pool and self.snapshot:
# ensure nothing contains / or any other url-unsafe character
safe_fsid = urllib.quote(self.fsid, '')
safe_pool = urllib.quote(self.pool, '')
safe_image = urllib.quote(self.image, '')
safe_snapshot = urllib.quote(self.snapshot, '')
return "rbd://%s/%s/%s/%s" % (safe_fsid, safe_pool,
safe_image, safe_snapshot)
else:
return "rbd://%s" % self.image
def parse_uri(self, uri):
prefix = 'rbd://'
if not uri.startswith(prefix):
reason = _('URI must start with rbd://')
msg = (_("Invalid URI: %(uri)s: %(reason)s") % {'uri': uri,
'reason': reason})
LOG.debug(msg)
raise exception.BadStoreUri(message=reason)
# convert to ascii since librbd doesn't handle unicode
try:
ascii_uri = str(uri)
except UnicodeError:
reason = _('URI contains non-ascii characters')
msg = (_("Invalid URI: %(uri)s: %(reason)s") % {'uri': uri,
'reason': reason})
LOG.debug(msg)
raise exception.BadStoreUri(message=reason)
pieces = ascii_uri[len(prefix):].split('/')
if len(pieces) == 1:
self.fsid, self.pool, self.image, self.snapshot = \
(None, None, pieces[0], None)
elif len(pieces) == 4:
self.fsid, self.pool, self.image, self.snapshot = \
map(urllib.unquote, pieces)
else:
reason = _('URI must have exactly 1 or 4 components')
msg = (_("Invalid URI: %(uri)s: %(reason)s") % {'uri': uri,
'reason': reason})
LOG.debug(msg)
raise exception.BadStoreUri(message=reason)
if any(map(lambda p: p == '', pieces)):
reason = _('URI cannot contain empty components')
msg = (_("Invalid URI: %(uri)s: %(reason)s") % {'uri': uri,
'reason': reason})
LOG.debug(msg)
raise exception.BadStoreUri(message=reason)
class ImageIterator(object):
"""
Reads data from an RBD image, one chunk at a time.
"""
def __init__(self, name, store):
self.name = name
self.pool = store.pool
self.user = store.user
self.conf_file = store.conf_file
self.chunk_size = store.chunk_size
def __iter__(self):
try:
with rados.Rados(conffile=self.conf_file,
rados_id=self.user) as conn:
with conn.open_ioctx(self.pool) as ioctx:
with rbd.Image(ioctx, self.name) as image:
img_info = image.stat()
size = img_info['size']
bytes_left = size
while bytes_left > 0:
length = min(self.chunk_size, bytes_left)
data = image.read(size - bytes_left, length)
bytes_left -= len(data)
yield data
raise StopIteration()
except rbd.ImageNotFound:
raise exception.NotFound(
_('RBD image %s does not exist') % self.name)
class Store(glance.store.base.Store):
"""An implementation of the RBD backend adapter."""
EXAMPLE_URL = "rbd://<FSID>/<POOL>/<IMAGE>/<SNAP>"
def get_schemes(self):
return ('rbd',)
def configure_add(self):
"""
Configure the Store to use the stored configuration options
Any store that needs special configuration should implement
this method. If the store was not able to successfully configure
itself, it should raise `exception.BadStoreConfiguration`
"""
try:
self.chunk_size = CONF.rbd_store_chunk_size * units.Mi
# these must not be unicode since they will be passed to a
# non-unicode-aware C library
self.pool = str(CONF.rbd_store_pool)
self.user = str(CONF.rbd_store_user)
self.conf_file = str(CONF.rbd_store_ceph_conf)
except cfg.ConfigFileValueError as e:
reason = _("Error in store configuration: %s") % e
LOG.error(reason)
raise exception.BadStoreConfiguration(store_name='rbd',
reason=reason)
def get(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file, and returns a tuple of generator
(for reading the image file) and image_size
:param location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:raises `glance.exception.NotFound` if image does not exist
"""
loc = location.store_location
return (ImageIterator(loc.image, self), self.get_size(location))
def get_size(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file, and returns the size
:param location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:raises `glance.exception.NotFound` if image does not exist
"""
loc = location.store_location
with rados.Rados(conffile=self.conf_file,
rados_id=self.user) as conn:
with conn.open_ioctx(self.pool) as ioctx:
try:
with rbd.Image(ioctx, loc.image,
snapshot=loc.snapshot) as image:
img_info = image.stat()
return img_info['size']
except rbd.ImageNotFound:
msg = _('RBD image %s does not exist') % loc.get_uri()
LOG.debug(msg)
raise exception.NotFound(msg)
def _create_image(self, fsid, ioctx, image_name, size, order):
"""
Create an rbd image. If librbd supports it,
make it a cloneable snapshot, so that copy-on-write
volumes can be created from it.
:param image_name Image's name
:retval `glance.store.rbd.StoreLocation` object
"""
librbd = rbd.RBD()
if hasattr(rbd, 'RBD_FEATURE_LAYERING'):
librbd.create(ioctx, image_name, size, order, old_format=False,
features=rbd.RBD_FEATURE_LAYERING)
return StoreLocation({
'fsid': fsid,
'pool': self.pool,
'image': image_name,
'snapshot': DEFAULT_SNAPNAME,
})
else:
librbd.create(ioctx, image_name, size, order, old_format=True)
return StoreLocation({'image': image_name})
def _delete_image(self, image_name, snapshot_name=None):
"""
Delete RBD image and snapshot.
:param image_name Image's name
:param snapshot_name Image snapshot's name
:raises NotFound if image does not exist;
InUseByStore if image is in use or snapshot unprotect failed
"""
with rados.Rados(conffile=self.conf_file, rados_id=self.user) as conn:
with conn.open_ioctx(self.pool) as ioctx:
try:
# First remove snapshot.
if snapshot_name is not None:
with rbd.Image(ioctx, image_name) as image:
try:
image.unprotect_snap(snapshot_name)
except rbd.ImageBusy:
log_msg = _("snapshot %(image)s@%(snap)s "
"could not be unprotected because "
"it is in use")
LOG.debug(log_msg %
{'image': image_name,
'snap': snapshot_name})
raise exception.InUseByStore()
image.remove_snap(snapshot_name)
# Then delete image.
rbd.RBD().remove(ioctx, image_name)
except rbd.ImageNotFound:
raise exception.NotFound(
_("RBD image %s does not exist") % image_name)
except rbd.ImageBusy:
log_msg = _("image %s could not be removed "
"because it is in use")
LOG.debug(log_msg % image_name)
raise exception.InUseByStore()
def add(self, image_id, image_file, image_size):
"""
Stores an image file with supplied identifier to the backend
storage system and returns a tuple containing information
about the stored image.
:param image_id: The opaque image identifier
:param image_file: The image data to write, as a file-like object
:param image_size: The size of the image data to write, in bytes
:retval tuple of URL in backing store, bytes written, checksum
and a dictionary with storage system specific information
:raises `glance.common.exception.Duplicate` if the image already
existed
"""
checksum = hashlib.md5()
image_name = str(image_id)
with rados.Rados(conffile=self.conf_file, rados_id=self.user) as conn:
fsid = None
if hasattr(conn, 'get_fsid'):
fsid = conn.get_fsid()
with conn.open_ioctx(self.pool) as ioctx:
order = int(math.log(self.chunk_size, 2))
LOG.debug('creating image %s with order %d and size %d',
image_name, order, image_size)
if image_size == 0:
LOG.warning(_("since image size is zero we will be doing "
"resize-before-write for each chunk which "
"will be considerably slower than normal"))
try:
loc = self._create_image(fsid, ioctx, image_name,
image_size, order)
except rbd.ImageExists:
raise exception.Duplicate(
_('RBD image %s already exists') % image_id)
try:
with rbd.Image(ioctx, image_name) as image:
bytes_written = 0
offset = 0
chunks = utils.chunkreadable(image_file,
self.chunk_size)
for chunk in chunks:
# If the image size provided is zero we need to do
# a resize for the amount we are writing. This will
# be slower so setting a higher chunk size may
# speed things up a bit.
if image_size == 0:
chunk_length = len(chunk)
length = offset + chunk_length
bytes_written += chunk_length
LOG.debug(_("resizing image to %s KiB") %
(length / units.Ki))
image.resize(length)
LOG.debug(_("writing chunk at offset %s") %
(offset))
offset += image.write(chunk, offset)
checksum.update(chunk)
if loc.snapshot:
image.create_snap(loc.snapshot)
image.protect_snap(loc.snapshot)
except Exception as exc:
# Delete image if one was created
try:
self._delete_image(loc.image, loc.snapshot)
except exception.NotFound:
pass
raise exc
# Make sure we send back the image size whether provided or inferred.
if image_size == 0:
image_size = bytes_written
return (loc.get_uri(), image_size, checksum.hexdigest(), {})
def delete(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file to delete.
:location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:raises NotFound if image does not exist;
InUseByStore if image is in use or snapshot unprotect failed
"""
loc = location.store_location
self._delete_image(loc.image, loc.snapshot)

542
glance/store/s3.py Normal file
View File

@ -0,0 +1,542 @@
# Copyright 2010 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Storage backend for S3 or Storage Servers that follow the S3 Protocol"""
import hashlib
import httplib
import re
import tempfile
import urlparse
from oslo.config import cfg
from glance.common import exception
from glance.common import utils
import glance.openstack.common.log as logging
import glance.store
import glance.store.base
import glance.store.location
LOG = logging.getLogger(__name__)
s3_opts = [
cfg.StrOpt('s3_store_host',
help=_('The host where the S3 server is listening.')),
cfg.StrOpt('s3_store_access_key', secret=True,
help=_('The S3 query token access key.')),
cfg.StrOpt('s3_store_secret_key', secret=True,
help=_('The S3 query token secret key.')),
cfg.StrOpt('s3_store_bucket',
help=_('The S3 bucket to be used to store the Glance data.')),
cfg.StrOpt('s3_store_object_buffer_dir',
help=_('The local directory where uploads will be staged '
'before they are transferred into S3.')),
cfg.BoolOpt('s3_store_create_bucket_on_put', default=False,
help=_('A boolean to determine if the S3 bucket should be '
'created on upload if it does not exist or if '
'an error should be returned to the user.')),
cfg.StrOpt('s3_store_bucket_url_format', default='subdomain',
help=_('The S3 calling format used to determine the bucket. '
'Either subdomain or path can be used.')),
]
CONF = cfg.CONF
CONF.register_opts(s3_opts)
class StoreLocation(glance.store.location.StoreLocation):
"""
Class describing an S3 URI. An S3 URI can look like any of
the following:
s3://accesskey:secretkey@s3.amazonaws.com/bucket/key-id
s3+http://accesskey:secretkey@s3.amazonaws.com/bucket/key-id
s3+https://accesskey:secretkey@s3.amazonaws.com/bucket/key-id
The s3+https:// URIs indicate there is an HTTPS s3service URL
"""
def process_specs(self):
self.scheme = self.specs.get('scheme', 's3')
self.accesskey = self.specs.get('accesskey')
self.secretkey = self.specs.get('secretkey')
s3_host = self.specs.get('s3serviceurl')
self.bucket = self.specs.get('bucket')
self.key = self.specs.get('key')
if s3_host.startswith('https://'):
self.scheme = 's3+https'
s3_host = s3_host[8:].strip('/')
elif s3_host.startswith('http://'):
s3_host = s3_host[7:].strip('/')
self.s3serviceurl = s3_host.strip('/')
def _get_credstring(self):
if self.accesskey:
return '%s:%s@' % (self.accesskey, self.secretkey)
return ''
def get_uri(self):
return "%s://%s%s/%s/%s" % (
self.scheme,
self._get_credstring(),
self.s3serviceurl,
self.bucket,
self.key)
def parse_uri(self, uri):
"""
Parse URLs. This method fixes an issue where credentials specified
in the URL are interpreted differently in Python 2.6.1+ than prior
versions of Python.
Note that an Amazon AWS secret key can contain the forward slash,
which is entirely retarded, and breaks urlparse miserably.
This function works around that issue.
"""
# Make sure that URIs that contain multiple schemes, such as:
# s3://accesskey:secretkey@https://s3.amazonaws.com/bucket/key-id
# are immediately rejected.
if uri.count('://') != 1:
reason = _("URI cannot contain more than one occurrence "
"of a scheme. If you have specified a URI like "
"s3://accesskey:secretkey@"
"https://s3.amazonaws.com/bucket/key-id"
", you need to change it to use the "
"s3+https:// scheme, like so: "
"s3+https://accesskey:secretkey@"
"s3.amazonaws.com/bucket/key-id")
LOG.debug(_("Invalid store uri: %s") % reason)
raise exception.BadStoreUri(message=reason)
pieces = urlparse.urlparse(uri)
assert pieces.scheme in ('s3', 's3+http', 's3+https')
self.scheme = pieces.scheme
path = pieces.path.strip('/')
netloc = pieces.netloc.strip('/')
entire_path = (netloc + '/' + path).strip('/')
if '@' in uri:
creds, path = entire_path.split('@')
cred_parts = creds.split(':')
try:
access_key = cred_parts[0]
secret_key = cred_parts[1]
# NOTE(jaypipes): Need to encode to UTF-8 here because of a
# bug in the HMAC library that boto uses.
# See: http://bugs.python.org/issue5285
# See: http://trac.edgewall.org/ticket/8083
access_key = access_key.encode('utf-8')
secret_key = secret_key.encode('utf-8')
self.accesskey = access_key
self.secretkey = secret_key
except IndexError:
reason = _("Badly formed S3 credentials %s") % creds
LOG.debug(reason)
raise exception.BadStoreUri()
else:
self.accesskey = None
path = entire_path
try:
path_parts = path.split('/')
self.key = path_parts.pop()
self.bucket = path_parts.pop()
if path_parts:
self.s3serviceurl = '/'.join(path_parts).strip('/')
else:
reason = _("Badly formed S3 URI. Missing s3 service URL.")
raise exception.BadStoreUri()
except IndexError:
reason = _("Badly formed S3 URI: %s") % uri
LOG.debug(reason)
raise exception.BadStoreUri()
class ChunkedFile(object):
"""
We send this back to the Glance API server as
something that can iterate over a ``boto.s3.key.Key``
"""
CHUNKSIZE = 65536
def __init__(self, fp):
self.fp = fp
def __iter__(self):
"""Return an iterator over the image file"""
try:
if self.fp:
while True:
chunk = self.fp.read(ChunkedFile.CHUNKSIZE)
if chunk:
yield chunk
else:
break
finally:
self.close()
def getvalue(self):
"""Return entire string value... used in testing."""
data = ""
self.len = 0
for chunk in self:
read_bytes = len(chunk)
data = data + chunk
self.len = self.len + read_bytes
return data
def close(self):
"""Close the internal file pointer."""
if self.fp:
self.fp.close()
self.fp = None
class Store(glance.store.base.Store):
"""An implementation of the s3 adapter."""
EXAMPLE_URL = "s3://<ACCESS_KEY>:<SECRET_KEY>@<S3_URL>/<BUCKET>/<OBJ>"
def get_schemes(self):
return ('s3', 's3+http', 's3+https')
def configure_add(self):
"""
Configure the Store to use the stored configuration options
Any store that needs special configuration should implement
this method. If the store was not able to successfully configure
itself, it should raise `exception.BadStoreConfiguration`
"""
self.s3_host = self._option_get('s3_store_host')
access_key = self._option_get('s3_store_access_key')
secret_key = self._option_get('s3_store_secret_key')
# NOTE(jaypipes): Need to encode to UTF-8 here because of a
# bug in the HMAC library that boto uses.
# See: http://bugs.python.org/issue5285
# See: http://trac.edgewall.org/ticket/8083
self.access_key = access_key.encode('utf-8')
self.secret_key = secret_key.encode('utf-8')
self.bucket = self._option_get('s3_store_bucket')
self.scheme = 's3'
if self.s3_host.startswith('https://'):
self.scheme = 's3+https'
self.full_s3_host = self.s3_host
elif self.s3_host.startswith('http://'):
self.full_s3_host = self.s3_host
else: # Defaults http
self.full_s3_host = 'http://' + self.s3_host
self.s3_store_object_buffer_dir = CONF.s3_store_object_buffer_dir
def _option_get(self, param):
result = getattr(CONF, param)
if not result:
reason = (_("Could not find %(param)s in configuration "
"options.") % {'param': param})
LOG.debug(reason)
raise exception.BadStoreConfiguration(store_name="s3",
reason=reason)
return result
def get(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file, and returns a tuple of generator
(for reading the image file) and image_size
:param location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:raises `glance.exception.NotFound` if image does not exist
"""
key = self._retrieve_key(location)
key.BufferSize = self.CHUNKSIZE
class ChunkedIndexable(glance.store.Indexable):
def another(self):
return (self.wrapped.fp.read(ChunkedFile.CHUNKSIZE)
if self.wrapped.fp else None)
return (ChunkedIndexable(ChunkedFile(key), key.size), key.size)
def get_size(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file, and returns the image_size (or 0
if unavailable)
:param location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
"""
try:
key = self._retrieve_key(location)
return key.size
except Exception:
return 0
def _retrieve_key(self, location):
loc = location.store_location
from boto.s3.connection import S3Connection
s3_conn = S3Connection(loc.accesskey, loc.secretkey,
host=loc.s3serviceurl,
is_secure=(loc.scheme == 's3+https'),
calling_format=get_calling_format())
bucket_obj = get_bucket(s3_conn, loc.bucket)
key = get_key(bucket_obj, loc.key)
msg = _("Retrieved image object from S3 using (s3_host=%(s3_host)s, "
"access_key=%(accesskey)s, bucket=%(bucket)s, "
"key=%(obj_name)s)") % ({'s3_host': loc.s3serviceurl,
'accesskey': loc.accesskey,
'bucket': loc.bucket,
'obj_name': loc.key})
LOG.debug(msg)
return key
def add(self, image_id, image_file, image_size):
"""
Stores an image file with supplied identifier to the backend
storage system and returns a tuple containing information
about the stored image.
:param image_id: The opaque image identifier
:param image_file: The image data to write, as a file-like object
:param image_size: The size of the image data to write, in bytes
:retval tuple of URL in backing store, bytes written, checksum
and a dictionary with storage system specific information
:raises `glance.common.exception.Duplicate` if the image already
existed
S3 writes the image data using the scheme:
s3://<ACCESS_KEY>:<SECRET_KEY>@<S3_URL>/<BUCKET>/<OBJ>
where:
<USER> = ``s3_store_user``
<KEY> = ``s3_store_key``
<S3_HOST> = ``s3_store_host``
<BUCKET> = ``s3_store_bucket``
<ID> = The id of the image being added
"""
from boto.s3.connection import S3Connection
loc = StoreLocation({'scheme': self.scheme,
'bucket': self.bucket,
'key': image_id,
's3serviceurl': self.full_s3_host,
'accesskey': self.access_key,
'secretkey': self.secret_key})
s3_conn = S3Connection(loc.accesskey, loc.secretkey,
host=loc.s3serviceurl,
is_secure=(loc.scheme == 's3+https'),
calling_format=get_calling_format())
create_bucket_if_missing(self.bucket, s3_conn)
bucket_obj = get_bucket(s3_conn, self.bucket)
obj_name = str(image_id)
def _sanitize(uri):
return re.sub('//.*:.*@',
'//s3_store_secret_key:s3_store_access_key@',
uri)
key = bucket_obj.get_key(obj_name)
if key and key.exists():
raise exception.Duplicate(_("S3 already has an image at "
"location %s") %
_sanitize(loc.get_uri()))
msg = _("Adding image object to S3 using (s3_host=%(s3_host)s, "
"access_key=%(access_key)s, bucket=%(bucket)s, "
"key=%(obj_name)s)") % ({'s3_host': self.s3_host,
'access_key': self.access_key,
'bucket': self.bucket,
'obj_name': obj_name})
LOG.debug(msg)
key = bucket_obj.new_key(obj_name)
# We need to wrap image_file, which is a reference to the
# webob.Request.body_file, with a seekable file-like object,
# otherwise the call to set_contents_from_file() will die
# with an error about Input object has no method 'seek'. We
# might want to call webob.Request.make_body_seekable(), but
# unfortunately, that method copies the entire image into
# memory and results in LP Bug #818292 occurring. So, here
# we write temporary file in as memory-efficient manner as
# possible and then supply the temporary file to S3. We also
# take this opportunity to calculate the image checksum while
# writing the tempfile, so we don't need to call key.compute_md5()
msg = _("Writing request body file to temporary file "
"for %s") % _sanitize(loc.get_uri())
LOG.debug(msg)
tmpdir = self.s3_store_object_buffer_dir
temp_file = tempfile.NamedTemporaryFile(dir=tmpdir)
checksum = hashlib.md5()
for chunk in utils.chunkreadable(image_file, self.CHUNKSIZE):
checksum.update(chunk)
temp_file.write(chunk)
temp_file.flush()
msg = (_("Uploading temporary file to S3 for %s") %
_sanitize(loc.get_uri()))
LOG.debug(msg)
# OK, now upload the data into the key
key.set_contents_from_file(open(temp_file.name, 'r+b'), replace=False)
size = key.size
checksum_hex = checksum.hexdigest()
LOG.debug(_("Wrote %(size)d bytes to S3 key named %(obj_name)s "
"with checksum %(checksum_hex)s"),
{'size': size, 'obj_name': obj_name,
'checksum_hex': checksum_hex})
return (loc.get_uri(), size, checksum_hex, {})
def delete(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file to delete
:location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:raises NotFound if image does not exist
"""
loc = location.store_location
from boto.s3.connection import S3Connection
s3_conn = S3Connection(loc.accesskey, loc.secretkey,
host=loc.s3serviceurl,
is_secure=(loc.scheme == 's3+https'),
calling_format=get_calling_format())
bucket_obj = get_bucket(s3_conn, loc.bucket)
# Close the key when we're through.
key = get_key(bucket_obj, loc.key)
msg = _("Deleting image object from S3 using (s3_host=%(s3_host)s, "
"access_key=%(accesskey)s, bucket=%(bucket)s, "
"key=%(obj_name)s)") % ({'s3_host': loc.s3serviceurl,
'accesskey': loc.accesskey,
'bucket': loc.bucket,
'obj_name': loc.key})
LOG.debug(msg)
return key.delete()
def get_bucket(conn, bucket_id):
"""
Get a bucket from an s3 connection
:param conn: The ``boto.s3.connection.S3Connection``
:param bucket_id: ID of the bucket to fetch
:raises ``glance.exception.NotFound`` if bucket is not found.
"""
bucket = conn.get_bucket(bucket_id)
if not bucket:
msg = _("Could not find bucket with ID %s") % bucket_id
LOG.debug(msg)
raise exception.NotFound(msg)
return bucket
def get_s3_location(s3_host):
from boto.s3.connection import Location
locations = {
's3.amazonaws.com': Location.DEFAULT,
's3-eu-west-1.amazonaws.com': Location.EU,
's3-us-west-1.amazonaws.com': Location.USWest,
's3-ap-southeast-1.amazonaws.com': Location.APSoutheast,
's3-ap-northeast-1.amazonaws.com': Location.APNortheast,
}
# strip off scheme and port if present
key = re.sub('^(https?://)?(?P<host>[^:]+)(:[0-9]+)?$',
'\g<host>',
s3_host)
return locations.get(key, Location.DEFAULT)
def create_bucket_if_missing(bucket, s3_conn):
"""
Creates a missing bucket in S3 if the
``s3_store_create_bucket_on_put`` option is set.
:param bucket: Name of bucket to create
:param s3_conn: Connection to S3
"""
from boto.exception import S3ResponseError
try:
s3_conn.get_bucket(bucket)
except S3ResponseError as e:
if e.status == httplib.NOT_FOUND:
if CONF.s3_store_create_bucket_on_put:
location = get_s3_location(CONF.s3_store_host)
try:
s3_conn.create_bucket(bucket, location=location)
except S3ResponseError as e:
msg = (_("Failed to add bucket to S3.\n"
"Got error from S3: %(e)s") % {'e': e})
raise glance.store.BackendException(msg)
else:
msg = (_("The bucket %(bucket)s does not exist in "
"S3. Please set the "
"s3_store_create_bucket_on_put option "
"to add bucket to S3 automatically.")
% {'bucket': bucket})
raise glance.store.BackendException(msg)
def get_key(bucket, obj):
"""
Get a key from a bucket
:param bucket: The ``boto.s3.Bucket``
:param obj: Object to get the key for
:raises ``glance.exception.NotFound`` if key is not found.
"""
key = bucket.get_key(obj)
if not key or not key.exists():
msg = (_("Could not find key %(obj)s in bucket %(bucket)s") %
{'obj': obj, 'bucket': bucket})
LOG.debug(msg)
raise exception.NotFound(msg)
return key
def get_calling_format(bucket_format=None):
import boto.s3.connection
if bucket_format is None:
bucket_format = CONF.s3_store_bucket_url_format
if bucket_format.lower() == 'path':
return boto.s3.connection.OrdinaryCallingFormat()
else:
return boto.s3.connection.SubdomainCallingFormat()

523
glance/store/scrubber.py Normal file
View File

@ -0,0 +1,523 @@
# Copyright 2010 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import calendar
import eventlet
import os
import time
from oslo.config import cfg
from glance.common import crypt
from glance.common import exception
from glance.common import utils
from glance import context
from glance.openstack.common import lockutils
import glance.openstack.common.log as logging
import glance.registry.client.v1.api as registry
LOG = logging.getLogger(__name__)
scrubber_opts = [
cfg.StrOpt('scrubber_datadir',
default='/var/lib/glance/scrubber',
help=_('Directory that the scrubber will use to track '
'information about what to delete. '
'Make sure this is set in glance-api.conf and '
'glance-scrubber.conf')),
cfg.IntOpt('scrub_time', default=0,
help=_('The amount of time in seconds to delay before '
'performing a delete.')),
cfg.BoolOpt('cleanup_scrubber', default=False,
help=_('A boolean that determines if the scrubber should '
'clean up the files it uses for taking data. Only '
'one server in your deployment should be designated '
'the cleanup host.')),
cfg.IntOpt('cleanup_scrubber_time', default=86400,
help=_('Items must have a modified time that is older than '
'this value in order to be candidates for cleanup.'))
]
CONF = cfg.CONF
CONF.register_opts(scrubber_opts)
CONF.import_opt('metadata_encryption_key', 'glance.common.config')
class ScrubQueue(object):
"""Image scrub queue base class.
The queue contains image's location which need to delete from backend.
"""
def __init__(self):
registry.configure_registry_client()
registry.configure_registry_admin_creds()
self.registry = registry.get_registry_client(context.RequestContext())
@abc.abstractmethod
def add_location(self, image_id, uri):
"""Adding image location to scrub queue.
:param image_id: The opaque image identifier
:param uri: The opaque image location uri
"""
pass
@abc.abstractmethod
def get_all_locations(self):
"""Returns a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
pass
@abc.abstractmethod
def pop_all_locations(self):
"""Pop out a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
pass
@abc.abstractmethod
def has_image(self, image_id):
"""Returns whether the queue contains an image or not.
:param image_id: The opaque image identifier
:retval a boolean value to inform including or not
"""
pass
class ScrubFileQueue(ScrubQueue):
"""File-based image scrub queue class."""
def __init__(self):
super(ScrubFileQueue, self).__init__()
self.scrubber_datadir = CONF.scrubber_datadir
utils.safe_mkdirs(self.scrubber_datadir)
self.scrub_time = CONF.scrub_time
self.metadata_encryption_key = CONF.metadata_encryption_key
def _read_queue_file(self, file_path):
"""Reading queue file to loading deleted location and timestamp out.
:param file_path: Queue file full path
:retval a list of image location timestamp tuple from queue file
"""
uris = []
delete_times = []
try:
with open(file_path, 'r') as f:
while True:
uri = f.readline().strip()
if uri:
uris.append(uri)
delete_times.append(int(f.readline().strip()))
else:
break
except Exception:
LOG.error(_("%s file can not be read.") % file_path)
return uris, delete_times
def _update_queue_file(self, file_path, remove_record_idxs):
"""Updating queue file to remove such queue records.
:param file_path: Queue file full path
:param remove_record_idxs: A list of record index those want to remove
"""
try:
with open(file_path, 'r') as f:
lines = f.readlines()
# NOTE(zhiyan) we need bottom up removing to
# keep record index be valid.
remove_record_idxs.sort(reverse=True)
for record_idx in remove_record_idxs:
# Each record has two lines
line_no = (record_idx + 1) * 2 - 1
del lines[line_no:line_no + 2]
with open(file_path, 'w') as f:
f.write(''.join(lines))
os.chmod(file_path, 0o600)
except Exception:
LOG.error(_("%s file can not be wrote.") % file_path)
def add_location(self, image_id, uri):
"""Adding image location to scrub queue.
:param image_id: The opaque image identifier
:param uri: The opaque image location uri
"""
with lockutils.lock("scrubber-%s" % image_id,
lock_file_prefix='glance-', external=True):
# NOTE(zhiyan): make sure scrubber does not cleanup
# 'pending_delete' images concurrently before the code
# get lock and reach here.
try:
image = self.registry.get_image(image_id)
if image['status'] == 'deleted':
return
except exception.NotFound as e:
LOG.error(_("Failed to find image to delete: "
"%(e)s"), {'e': e})
return
delete_time = time.time() + self.scrub_time
file_path = os.path.join(self.scrubber_datadir, str(image_id))
if self.metadata_encryption_key is not None:
uri = crypt.urlsafe_encrypt(self.metadata_encryption_key,
uri, 64)
if os.path.exists(file_path):
# Append the uri of location to the queue file
with open(file_path, 'a') as f:
f.write('\n')
f.write('\n'.join([uri, str(int(delete_time))]))
else:
# NOTE(zhiyan): Protect the file before we write any data.
open(file_path, 'w').close()
os.chmod(file_path, 0o600)
with open(file_path, 'w') as f:
f.write('\n'.join([uri, str(int(delete_time))]))
os.utime(file_path, (delete_time, delete_time))
def _walk_all_locations(self, remove=False):
"""Returns a list of image id and location tuple from scrub queue.
:param remove: Whether remove location from queue or not after walk
:retval a list of image image_id and location tuple from scrub queue
"""
if not os.path.exists(self.scrubber_datadir):
LOG.info(_("%s directory does not exist.") % self.scrubber_datadir)
return []
ret = []
for root, dirs, files in os.walk(self.scrubber_datadir):
for image_id in files:
if not utils.is_uuid_like(image_id):
continue
with lockutils.lock("scrubber-%s" % image_id,
lock_file_prefix='glance-', external=True):
file_path = os.path.join(self.scrubber_datadir, image_id)
uris, delete_times = self._read_queue_file(file_path)
remove_record_idxs = []
skipped = False
for (record_idx, delete_time) in enumerate(delete_times):
if delete_time > time.time():
skipped = True
continue
else:
ret.append((image_id, uris[record_idx]))
remove_record_idxs.append(record_idx)
if remove:
if skipped:
# NOTE(zhiyan): remove location records from
# the queue file.
self._update_queue_file(file_path,
remove_record_idxs)
else:
utils.safe_remove(file_path)
return ret
def get_all_locations(self):
"""Returns a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
return self._walk_all_locations()
def pop_all_locations(self):
"""Pop out a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
return self._walk_all_locations(remove=True)
def has_image(self, image_id):
"""Returns whether the queue contains an image or not.
:param image_id: The opaque image identifier
:retval a boolean value to inform including or not
"""
return os.path.exists(os.path.join(self.scrubber_datadir,
str(image_id)))
class ScrubDBQueue(ScrubQueue):
"""Database-based image scrub queue class."""
def __init__(self):
super(ScrubDBQueue, self).__init__()
self.cleanup_scrubber_time = CONF.cleanup_scrubber_time
def add_location(self, image_id, uri):
"""Adding image location to scrub queue.
:param image_id: The opaque image identifier
:param uri: The opaque image location uri
"""
raise NotImplementedError
def _walk_all_locations(self, remove=False):
"""Returns a list of image id and location tuple from scrub queue.
:param remove: Whether remove location from queue or not after walk
:retval a list of image id and location tuple from scrub queue
"""
filters = {'deleted': True,
'is_public': 'none',
'status': 'pending_delete'}
ret = []
for image in self.registry.get_images_detailed(filters=filters):
deleted_at = image.get('deleted_at')
if not deleted_at:
continue
# NOTE: Strip off microseconds which may occur after the last '.,'
# Example: 2012-07-07T19:14:34.974216
date_str = deleted_at.rsplit('.', 1)[0].rsplit(',', 1)[0]
delete_time = calendar.timegm(time.strptime(date_str,
"%Y-%m-%dT%H:%M:%S"))
if delete_time + self.cleanup_scrubber_time > time.time():
continue
ret.extend([(image['id'], location['uri'])
for location in image['location_data']])
if remove:
self.registry.update_image(image['id'], {'status': 'deleted'})
return ret
def get_all_locations(self):
"""Returns a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
return self._walk_all_locations()
def pop_all_locations(self):
"""Pop out a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
return self._walk_all_locations(remove=True)
def has_image(self, image_id):
"""Returns whether the queue contains an image or not.
:param image_id: The opaque image identifier
:retval a boolean value to inform including or not
"""
try:
image = self.registry.get_image(image_id)
return image['status'] == 'pending_delete'
except exception.NotFound as e:
return False
_file_queue = None
_db_queue = None
def get_scrub_queues():
global _file_queue, _db_queue
if not _file_queue:
_file_queue = ScrubFileQueue()
if not _db_queue:
_db_queue = ScrubDBQueue()
return (_file_queue, _db_queue)
class Daemon(object):
def __init__(self, wakeup_time=300, threads=1000):
LOG.info(_("Starting Daemon: wakeup_time=%(wakeup_time)s "
"threads=%(threads)s"),
{'wakeup_time': wakeup_time, 'threads': threads})
self.wakeup_time = wakeup_time
self.event = eventlet.event.Event()
self.pool = eventlet.greenpool.GreenPool(threads)
def start(self, application):
self._run(application)
def wait(self):
try:
self.event.wait()
except KeyboardInterrupt:
msg = _("Daemon Shutdown on KeyboardInterrupt")
LOG.info(msg)
def _run(self, application):
LOG.debug(_("Running application"))
self.pool.spawn_n(application.run, self.pool, self.event)
eventlet.spawn_after(self.wakeup_time, self._run, application)
LOG.debug(_("Next run scheduled in %s seconds") % self.wakeup_time)
class Scrubber(object):
def __init__(self, store_api):
LOG.info(_("Initializing scrubber with configuration: %s") %
unicode({'scrubber_datadir': CONF.scrubber_datadir,
'cleanup': CONF.cleanup_scrubber,
'cleanup_time': CONF.cleanup_scrubber_time,
'registry_host': CONF.registry_host,
'registry_port': CONF.registry_port}))
utils.safe_mkdirs(CONF.scrubber_datadir)
self.store_api = store_api
registry.configure_registry_client()
registry.configure_registry_admin_creds()
self.registry = registry.get_registry_client(context.RequestContext())
(self.file_queue, self.db_queue) = get_scrub_queues()
def _get_delete_jobs(self, queue, pop):
try:
if pop:
image_id_uri_list = queue.pop_all_locations()
else:
image_id_uri_list = queue.get_all_locations()
except Exception:
LOG.error(_("Can not %s scrub jobs from queue.") %
'pop' if pop else 'get')
return None
delete_jobs = {}
for image_id, image_uri in image_id_uri_list:
if image_id not in delete_jobs:
delete_jobs[image_id] = []
delete_jobs[image_id].append((image_id, image_uri))
return delete_jobs
def run(self, pool, event=None):
delete_jobs = self._get_delete_jobs(self.file_queue, True)
if delete_jobs:
for image_id, jobs in delete_jobs.iteritems():
self._scrub_image(pool, image_id, jobs)
if CONF.cleanup_scrubber:
self._cleanup(pool)
def _scrub_image(self, pool, image_id, delete_jobs):
if len(delete_jobs) == 0:
return
LOG.info(_("Scrubbing image %(id)s from %(count)d locations.") %
{'id': image_id, 'count': len(delete_jobs)})
# NOTE(bourke): The starmap must be iterated to do work
list(pool.starmap(self._delete_image_from_backend, delete_jobs))
image = self.registry.get_image(image_id)
if (image['status'] == 'pending_delete' and
not self.file_queue.has_image(image_id)):
self.registry.update_image(image_id, {'status': 'deleted'})
def _delete_image_from_backend(self, image_id, uri):
if CONF.metadata_encryption_key is not None:
uri = crypt.urlsafe_decrypt(CONF.metadata_encryption_key, uri)
try:
LOG.debug(_("Deleting URI from image %(image_id)s.") %
{'image_id': image_id})
# Here we create a request context with credentials to support
# delayed delete when using multi-tenant backend storage
admin_tenant = CONF.admin_tenant_name
auth_token = self.registry.auth_tok
admin_context = context.RequestContext(user=CONF.admin_user,
tenant=admin_tenant,
auth_tok=auth_token)
self.store_api.delete_from_backend(admin_context, uri)
except Exception:
msg = _("Failed to delete URI from image %(image_id)s")
LOG.error(msg % {'image_id': image_id})
def _read_cleanup_file(self, file_path):
"""Reading cleanup to get latest cleanup timestamp.
:param file_path: Cleanup status file full path
:retval latest cleanup timestamp
"""
try:
if not os.path.exists(file_path):
msg = _("%s file is not exists.") % unicode(file_path)
raise Exception(msg)
atime = int(os.path.getatime(file_path))
mtime = int(os.path.getmtime(file_path))
if atime != mtime:
msg = _("%s file contains conflicting cleanup "
"timestamp.") % unicode(file_path)
raise Exception(msg)
return atime
except Exception as e:
LOG.error(e)
return None
def _update_cleanup_file(self, file_path, cleanup_time):
"""Update latest cleanup timestamp to cleanup file.
:param file_path: Cleanup status file full path
:param cleanup_time: The Latest cleanup timestamp
"""
try:
open(file_path, 'w').close()
os.chmod(file_path, 0o600)
os.utime(file_path, (cleanup_time, cleanup_time))
except Exception:
LOG.error(_("%s file can not be created.") % unicode(file_path))
def _cleanup(self, pool):
now = time.time()
cleanup_file = os.path.join(CONF.scrubber_datadir, ".cleanup")
if not os.path.exists(cleanup_file):
self._update_cleanup_file(cleanup_file, now)
return
last_cleanup_time = self._read_cleanup_file(cleanup_file)
cleanup_time = last_cleanup_time + CONF.cleanup_scrubber_time
if cleanup_time > now:
return
LOG.info(_("Getting images deleted before "
"%s") % CONF.cleanup_scrubber_time)
self._update_cleanup_file(cleanup_file, now)
delete_jobs = self._get_delete_jobs(self.db_queue, False)
if not delete_jobs:
return
for image_id, jobs in delete_jobs.iteritems():
with lockutils.lock("scrubber-%s" % image_id,
lock_file_prefix='glance-', external=True):
if not self.file_queue.has_image(image_id):
# NOTE(zhiyan): scrubber should not cleanup this image
# since a queue file be created for this 'pending_delete'
# image concurrently before the code get lock and
# reach here. The checking only be worth if glance-api and
# glance-scrubber service be deployed on a same host.
self._scrub_image(pool, image_id, jobs)

307
glance/store/sheepdog.py Normal file
View File

@ -0,0 +1,307 @@
# Copyright 2013 Taobao Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Storage backend for Sheepdog storage system"""
import hashlib
from oslo.config import cfg
from glance.common import exception
from glance.openstack.common import excutils
import glance.openstack.common.log as logging
from glance.openstack.common import processutils
from glance.openstack.common import units
import glance.store
import glance.store.base
import glance.store.location
LOG = logging.getLogger(__name__)
DEFAULT_ADDR = 'localhost'
DEFAULT_PORT = 7000
DEFAULT_CHUNKSIZE = 64 # in MiB
LOG = logging.getLogger(__name__)
sheepdog_opts = [
cfg.IntOpt('sheepdog_store_chunk_size', default=DEFAULT_CHUNKSIZE,
help=_('Images will be chunked into objects of this size '
'(in megabytes). For best performance, this should be '
'a power of two.')),
cfg.IntOpt('sheepdog_store_port', default=DEFAULT_PORT,
help=_('Port of sheep daemon.')),
cfg.StrOpt('sheepdog_store_address', default=DEFAULT_ADDR,
help=_('IP address of sheep daemon.'))
]
CONF = cfg.CONF
CONF.register_opts(sheepdog_opts)
class SheepdogImage:
"""Class describing an image stored in Sheepdog storage."""
def __init__(self, addr, port, name, chunk_size):
self.addr = addr
self.port = port
self.name = name
self.chunk_size = chunk_size
def _run_command(self, command, data, *params):
cmd = ("collie vdi %(command)s -a %(addr)s -p %(port)d %(name)s "
"%(params)s" %
{"command": command,
"addr": self.addr,
"port": self.port,
"name": self.name,
"params": " ".join(map(str, params))})
try:
return processutils.execute(
cmd, process_input=data, shell=True)[0]
except processutils.ProcessExecutionError as exc:
LOG.error(exc)
raise glance.store.BackendException(exc)
def get_size(self):
"""
Return the size of the this iamge
Sheepdog Usage: collie vdi list -r -a address -p port image
"""
out = self._run_command("list -r", None)
return long(out.split(' ')[3])
def read(self, offset, count):
"""
Read up to 'count' bytes from this image starting at 'offset' and
return the data.
Sheepdog Usage: collie vdi read -a address -p port image offset len
"""
return self._run_command("read", None, str(offset), str(count))
def write(self, data, offset, count):
"""
Write up to 'count' bytes from the data to this image starting at
'offset'
Sheepdog Usage: collie vdi write -a address -p port image offset len
"""
self._run_command("write", data, str(offset), str(count))
def create(self, size):
"""
Create this image in the Sheepdog cluster with size 'size'.
Sheepdog Usage: collie vdi create -a address -p port image size
"""
self._run_command("create", None, str(size))
def delete(self):
"""
Delete this image in the Sheepdog cluster
Sheepdog Usage: collie vdi delete -a address -p port image
"""
self._run_command("delete", None)
def exist(self):
"""
Check if this image exists in the Sheepdog cluster via 'list' command
Sheepdog Usage: collie vdi list -r -a address -p port image
"""
out = self._run_command("list -r", None)
if not out:
return False
else:
return True
class StoreLocation(glance.store.location.StoreLocation):
"""
Class describing a Sheepdog URI. This is of the form:
sheepdog://image
"""
def process_specs(self):
self.image = self.specs.get('image')
def get_uri(self):
return "sheepdog://%s" % self.image
def parse_uri(self, uri):
if not uri.startswith('sheepdog://'):
raise exception.BadStoreUri(uri, "URI must start with %s://" %
'sheepdog')
self.image = uri[11:]
class ImageIterator(object):
"""
Reads data from an Sheepdog image, one chunk at a time.
"""
def __init__(self, image):
self.image = image
def __iter__(self):
image = self.image
total = left = image.get_size()
while left > 0:
length = min(image.chunk_size, left)
data = image.read(total - left, length)
left -= len(data)
yield data
raise StopIteration()
class Store(glance.store.base.Store):
"""Sheepdog backend adapter."""
EXAMPLE_URL = "sheepdog://image"
def get_schemes(self):
return ('sheepdog',)
def configure_add(self):
"""
Configure the Store to use the stored configuration options
Any store that needs special configuration should implement
this method. If the store was not able to successfully configure
itself, it should raise `exception.BadStoreConfiguration`
"""
try:
self.chunk_size = CONF.sheepdog_store_chunk_size * units.Mi
self.addr = CONF.sheepdog_store_address
self.port = CONF.sheepdog_store_port
except cfg.ConfigFileValueError as e:
reason = _("Error in store configuration: %s") % e
LOG.error(reason)
raise exception.BadStoreConfiguration(store_name='sheepdog',
reason=reason)
try:
processutils.execute("collie", shell=True)
except processutils.ProcessExecutionError as exc:
reason = _("Error in store configuration: %s") % exc
LOG.error(reason)
raise exception.BadStoreConfiguration(store_name='sheepdog',
reason=reason)
def get(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file, and returns a generator for reading
the image file
:param location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:raises `glance.exception.NotFound` if image does not exist
"""
loc = location.store_location
image = SheepdogImage(self.addr, self.port, loc.image,
self.chunk_size)
if not image.exist():
raise exception.NotFound(_("Sheepdog image %s does not exist")
% image.name)
return (ImageIterator(image), image.get_size())
def get_size(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file and returns the image size
:param location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:raises `glance.exception.NotFound` if image does not exist
:rtype int
"""
loc = location.store_location
image = SheepdogImage(self.addr, self.port, loc.image,
self.chunk_size)
if not image.exist():
raise exception.NotFound(_("Sheepdog image %s does not exist")
% image.name)
return image.get_size()
def add(self, image_id, image_file, image_size):
"""
Stores an image file with supplied identifier to the backend
storage system and returns a tuple containing information
about the stored image.
:param image_id: The opaque image identifier
:param image_file: The image data to write, as a file-like object
:param image_size: The size of the image data to write, in bytes
:retval tuple of URL in backing store, bytes written, and checksum
:raises `glance.common.exception.Duplicate` if the image already
existed
"""
image = SheepdogImage(self.addr, self.port, image_id,
self.chunk_size)
if image.exist():
raise exception.Duplicate(_("Sheepdog image %s already exists")
% image_id)
location = StoreLocation({'image': image_id})
checksum = hashlib.md5()
image.create(image_size)
try:
total = left = image_size
while left > 0:
length = min(self.chunk_size, left)
data = image_file.read(length)
image.write(data, total - left, length)
left -= length
checksum.update(data)
except Exception:
# Note(zhiyan): clean up already received data when
# error occurs such as ImageSizeLimitExceeded exception.
with excutils.save_and_reraise_exception():
image.delete()
return (location.get_uri(), image_size, checksum.hexdigest(), {})
def delete(self, location):
"""
Takes a `glance.store.location.Location` object that indicates
where to find the image file to delete
:location `glance.store.location.Location` object, supplied
from glance.store.location.get_location_from_uri()
:raises NotFound if image does not exist
"""
loc = location.store_location
image = SheepdogImage(self.addr, self.port, loc.image,
self.chunk_size)
if not image.exist():
raise exception.NotFound(_("Sheepdog image %s does not exist") %
loc.image)
image.delete()

687
glance/store/swift.py Normal file
View File

@ -0,0 +1,687 @@
# Copyright 2010-2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Storage backend for SWIFT"""
from __future__ import absolute_import
import hashlib
import httplib
import math
import urllib
import urlparse
from oslo.config import cfg
from glance.common import auth
from glance.common import exception
from glance.openstack.common import excutils
import glance.openstack.common.log as logging
import glance.store
import glance.store.base
import glance.store.location
try:
import swiftclient
except ImportError:
pass
LOG = logging.getLogger(__name__)
DEFAULT_CONTAINER = 'glance'
DEFAULT_LARGE_OBJECT_SIZE = 5 * 1024 # 5GB
DEFAULT_LARGE_OBJECT_CHUNK_SIZE = 200 # 200M
ONE_MB = 1000 * 1024
swift_opts = [
cfg.BoolOpt('swift_enable_snet', default=False,
help=_('Whether to use ServiceNET to communicate with the '
'Swift storage servers.')),
cfg.StrOpt('swift_store_auth_address',
help=_('The address where the Swift authentication service '
'is listening.')),
cfg.StrOpt('swift_store_user', secret=True,
help=_('The user to authenticate against the Swift '
'authentication service')),
cfg.StrOpt('swift_store_key', secret=True,
help=_('Auth key for the user authenticating against the '
'Swift authentication service.')),
cfg.StrOpt('swift_store_auth_version', default='2',
help=_('Version of the authentication service to use. '
'Valid versions are 2 for keystone and 1 for swauth '
'and rackspace')),
cfg.BoolOpt('swift_store_auth_insecure', default=False,
help=_('If True, swiftclient won\'t check for a valid SSL '
'certificate when authenticating.')),
cfg.StrOpt('swift_store_region',
help=_('The region of the swift endpoint to be used for '
'single tenant. This setting is only necessary if the '
'tenant has multiple swift endpoints.')),
cfg.StrOpt('swift_store_endpoint_type', default='publicURL',
help=_('A string giving the endpoint type of the swift '
'service to use (publicURL, adminURL or internalURL). '
'This setting is only used if swift_store_auth_version '
'is 2.')),
cfg.StrOpt('swift_store_service_type', default='object-store',
help=_('A string giving the service type of the swift service '
'to use. This setting is only used if '
'swift_store_auth_version is 2.')),
cfg.StrOpt('swift_store_container',
default=DEFAULT_CONTAINER,
help=_('Container within the account that the account should '
'use for storing images in Swift.')),
cfg.IntOpt('swift_store_large_object_size',
default=DEFAULT_LARGE_OBJECT_SIZE,
help=_('The size, in MB, that Glance will start chunking image '
'files and do a large object manifest in Swift')),
cfg.IntOpt('swift_store_large_object_chunk_size',
default=DEFAULT_LARGE_OBJECT_CHUNK_SIZE,
help=_('The amount of data written to a temporary disk buffer '
'during the process of chunking the image file.')),
cfg.BoolOpt('swift_store_create_container_on_put', default=False,
help=_('A boolean value that determines if we create the '
'container if it does not exist.')),
cfg.BoolOpt('swift_store_multi_tenant', default=False,
help=_('If set to True, enables multi-tenant storage '
'mode which causes Glance images to be stored in '
'tenant specific Swift accounts.')),
cfg.ListOpt('swift_store_admin_tenants', default=[],
help=_('A list of tenants that will be granted read/write '
'access on all Swift containers created by Glance in '
'multi-tenant mode.')),
cfg.BoolOpt('swift_store_ssl_compression', default=True,
help=_('If set to False, disables SSL layer compression of '
'https swift requests. Setting to False may improve '
'performance for images which are already in a '
'compressed format, eg qcow2.')),
]
CONF = cfg.CONF
CONF.register_opts(swift_opts)
class StoreLocation(glance.store.location.StoreLocation):
"""
Class describing a Swift URI. A Swift URI can look like any of
the following:
swift://user:pass@authurl.com/container/obj-id
swift://account:user:pass@authurl.com/container/obj-id
swift+http://user:pass@authurl.com/container/obj-id
swift+https://user:pass@authurl.com/container/obj-id
When using multi-tenant a URI might look like this (a storage URL):
swift+https://example.com/container/obj-id
The swift+http:// URIs indicate there is an HTTP authentication URL.
The default for Swift is an HTTPS authentication URL, so swift:// and
swift+https:// are the same...
"""
def process_specs(self):
self.scheme = self.specs.get('scheme', 'swift+https')
self.user = self.specs.get('user')
self.key = self.specs.get('key')
self.auth_or_store_url = self.specs.get('auth_or_store_url')
self.container = self.specs.get('container')
self.obj = self.specs.get('obj')
def _get_credstring(self):
if self.user and self.key:
return '%s:%s@' % (urllib.quote(self.user), urllib.quote(self.key))
return ''
def get_uri(self):
auth_or_store_url = self.auth_or_store_url
if auth_or_store_url.startswith('http://'):
auth_or_store_url = auth_or_store_url[len('http://'):]
elif auth_or_store_url.startswith('https://'):
auth_or_store_url = auth_or_store_url[len('https://'):]
credstring = self._get_credstring()
auth_or_store_url = auth_or_store_url.strip('/')
container = self.container.strip('/')
obj = self.obj.strip('/')
return '%s://%s%s/%s/%s' % (self.scheme, credstring, auth_or_store_url,
container, obj)
def parse_uri(self, uri):
"""
Parse URLs. This method fixes an issue where credentials specified
in the URL are interpreted differently in Python 2.6.1+ than prior
versions of Python. It also deals with the peculiarity that new-style
Swift URIs have where a username can contain a ':', like so:
swift://account:user:pass@authurl.com/container/obj
"""
# Make sure that URIs that contain multiple schemes, such as:
# swift://user:pass@http://authurl.com/v1/container/obj
# are immediately rejected.
if uri.count('://') != 1:
reason = _("URI cannot contain more than one occurrence "
"of a scheme. If you have specified a URI like "
"swift://user:pass@http://authurl.com/v1/container/obj"
", you need to change it to use the "
"swift+http:// scheme, like so: "
"swift+http://user:pass@authurl.com/v1/container/obj")
LOG.debug(_("Invalid store URI: %(reason)s"), {'reason': reason})
raise exception.BadStoreUri(message=reason)
pieces = urlparse.urlparse(uri)
assert pieces.scheme in ('swift', 'swift+http', 'swift+https')
self.scheme = pieces.scheme
netloc = pieces.netloc
path = pieces.path.lstrip('/')
if netloc != '':
# > Python 2.6.1
if '@' in netloc:
creds, netloc = netloc.split('@')
else:
creds = None
else:
# Python 2.6.1 compat
# see lp659445 and Python issue7904
if '@' in path:
creds, path = path.split('@')
else:
creds = None
netloc = path[0:path.find('/')].strip('/')
path = path[path.find('/'):].strip('/')
if creds:
cred_parts = creds.split(':')
if len(cred_parts) != 2:
reason = (_("Badly formed credentials in Swift URI."))
LOG.debug(reason)
raise exception.BadStoreUri()
user, key = cred_parts
self.user = urllib.unquote(user)
self.key = urllib.unquote(key)
else:
self.user = None
self.key = None
path_parts = path.split('/')
try:
self.obj = path_parts.pop()
self.container = path_parts.pop()
if not netloc.startswith('http'):
# push hostname back into the remaining to build full authurl
path_parts.insert(0, netloc)
self.auth_or_store_url = '/'.join(path_parts)
except IndexError:
reason = _("Badly formed Swift URI.")
LOG.debug(reason)
raise exception.BadStoreUri()
@property
def swift_url(self):
"""
Creates a fully-qualified auth url that the Swift client library can
use. The scheme for the auth_url is determined using the scheme
included in the `location` field.
HTTPS is assumed, unless 'swift+http' is specified.
"""
if self.auth_or_store_url.startswith('http'):
return self.auth_or_store_url
else:
if self.scheme in ('swift+https', 'swift'):
auth_scheme = 'https://'
else:
auth_scheme = 'http://'
return ''.join([auth_scheme, self.auth_or_store_url])
def Store(context=None, loc=None):
if (CONF.swift_store_multi_tenant and
(loc is None or loc.store_location.user is None)):
return MultiTenantStore(context, loc)
return SingleTenantStore(context, loc)
class BaseStore(glance.store.base.Store):
CHUNKSIZE = 65536
def get_schemes(self):
return ('swift+https', 'swift', 'swift+http')
def configure(self):
_obj_size = self._option_get('swift_store_large_object_size')
self.large_object_size = _obj_size * ONE_MB
_chunk_size = self._option_get('swift_store_large_object_chunk_size')
self.large_object_chunk_size = _chunk_size * ONE_MB
self.admin_tenants = CONF.swift_store_admin_tenants
self.region = CONF.swift_store_region
self.service_type = CONF.swift_store_service_type
self.endpoint_type = CONF.swift_store_endpoint_type
self.snet = CONF.swift_enable_snet
self.insecure = CONF.swift_store_auth_insecure
self.ssl_compression = CONF.swift_store_ssl_compression
def get(self, location, connection=None):
location = location.store_location
if not connection:
connection = self.get_connection(location)
try:
resp_headers, resp_body = connection.get_object(
container=location.container, obj=location.obj,
resp_chunk_size=self.CHUNKSIZE)
except swiftclient.ClientException as e:
if e.http_status == httplib.NOT_FOUND:
msg = _("Swift could not find object %s.") % location.obj
LOG.warn(msg)
raise exception.NotFound(msg)
else:
raise
class ResponseIndexable(glance.store.Indexable):
def another(self):
try:
return self.wrapped.next()
except StopIteration:
return ''
length = int(resp_headers.get('content-length', 0))
return (ResponseIndexable(resp_body, length), length)
def get_size(self, location, connection=None):
location = location.store_location
if not connection:
connection = self.get_connection(location)
try:
resp_headers = connection.head_object(
container=location.container, obj=location.obj)
return int(resp_headers.get('content-length', 0))
except Exception:
return 0
def _option_get(self, param):
result = getattr(CONF, param)
if not result:
reason = (_("Could not find %(param)s in configuration "
"options.") % {'param': param})
LOG.error(reason)
raise exception.BadStoreConfiguration(store_name="swift",
reason=reason)
return result
def _delete_stale_chunks(self, connection, container, chunk_list):
for chunk in chunk_list:
LOG.debug(_("Deleting chunk %s") % chunk)
try:
connection.delete_object(container, chunk)
except Exception:
msg = _("Failed to delete orphaned chunk %s/%s")
LOG.exception(msg, container, chunk)
def add(self, image_id, image_file, image_size, connection=None):
location = self.create_location(image_id)
if not connection:
connection = self.get_connection(location)
self._create_container_if_missing(location.container, connection)
LOG.debug(_("Adding image object '%(obj_name)s' "
"to Swift") % dict(obj_name=location.obj))
try:
if image_size > 0 and image_size < self.large_object_size:
# Image size is known, and is less than large_object_size.
# Send to Swift with regular PUT.
obj_etag = connection.put_object(location.container,
location.obj, image_file,
content_length=image_size)
else:
# Write the image into Swift in chunks.
chunk_id = 1
if image_size > 0:
total_chunks = str(int(
math.ceil(float(image_size) /
float(self.large_object_chunk_size))))
else:
# image_size == 0 is when we don't know the size
# of the image. This can occur with older clients
# that don't inspect the payload size.
LOG.debug(_("Cannot determine image size. Adding as a "
"segmented object to Swift."))
total_chunks = '?'
checksum = hashlib.md5()
written_chunks = []
combined_chunks_size = 0
while True:
chunk_size = self.large_object_chunk_size
if image_size == 0:
content_length = None
else:
left = image_size - combined_chunks_size
if left == 0:
break
if chunk_size > left:
chunk_size = left
content_length = chunk_size
chunk_name = "%s-%05d" % (location.obj, chunk_id)
reader = ChunkReader(image_file, checksum, chunk_size)
try:
chunk_etag = connection.put_object(
location.container, chunk_name, reader,
content_length=content_length)
written_chunks.append(chunk_name)
except Exception:
# Delete orphaned segments from swift backend
with excutils.save_and_reraise_exception():
LOG.exception(_("Error during chunked upload to "
"backend, deleting stale chunks"))
self._delete_stale_chunks(connection,
location.container,
written_chunks)
bytes_read = reader.bytes_read
msg = (_("Wrote chunk %(chunk_name)s (%(chunk_id)d/"
"%(total_chunks)s) of length %(bytes_read)d "
"to Swift returning MD5 of content: "
"%(chunk_etag)s") %
{'chunk_name': chunk_name,
'chunk_id': chunk_id,
'total_chunks': total_chunks,
'bytes_read': bytes_read,
'chunk_etag': chunk_etag})
LOG.debug(msg)
if bytes_read == 0:
# Delete the last chunk, because it's of zero size.
# This will happen if size == 0.
LOG.debug(_("Deleting final zero-length chunk"))
connection.delete_object(location.container,
chunk_name)
break
chunk_id += 1
combined_chunks_size += bytes_read
# In the case we have been given an unknown image size,
# set the size to the total size of the combined chunks.
if image_size == 0:
image_size = combined_chunks_size
# Now we write the object manifest and return the
# manifest's etag...
manifest = "%s/%s-" % (location.container, location.obj)
headers = {'ETag': hashlib.md5("").hexdigest(),
'X-Object-Manifest': manifest}
# The ETag returned for the manifest is actually the
# MD5 hash of the concatenated checksums of the strings
# of each chunk...so we ignore this result in favour of
# the MD5 of the entire image file contents, so that
# users can verify the image file contents accordingly
connection.put_object(location.container, location.obj,
None, headers=headers)
obj_etag = checksum.hexdigest()
# NOTE: We return the user and key here! Have to because
# location is used by the API server to return the actual
# image data. We *really* should consider NOT returning
# the location attribute from GET /images/<ID> and
# GET /images/details
return (location.get_uri(), image_size, obj_etag, {})
except swiftclient.ClientException as e:
if e.http_status == httplib.CONFLICT:
raise exception.Duplicate(_("Swift already has an image at "
"this location"))
msg = (_("Failed to add object to Swift.\n"
"Got error from Swift: %(e)s") % {'e': e})
LOG.error(msg)
raise glance.store.BackendException(msg)
def delete(self, location, connection=None):
location = location.store_location
if not connection:
connection = self.get_connection(location)
try:
# We request the manifest for the object. If one exists,
# that means the object was uploaded in chunks/segments,
# and we need to delete all the chunks as well as the
# manifest.
manifest = None
try:
headers = connection.head_object(
location.container, location.obj)
manifest = headers.get('x-object-manifest')
except swiftclient.ClientException as e:
if e.http_status != httplib.NOT_FOUND:
raise
if manifest:
# Delete all the chunks before the object manifest itself
obj_container, obj_prefix = manifest.split('/', 1)
segments = connection.get_container(
obj_container, prefix=obj_prefix)[1]
for segment in segments:
# TODO(jaypipes): This would be an easy area to parallelize
# since we're simply sending off parallelizable requests
# to Swift to delete stuff. It's not like we're going to
# be hogging up network or file I/O here...
connection.delete_object(obj_container,
segment['name'])
# Delete object (or, in segmented case, the manifest)
connection.delete_object(location.container, location.obj)
except swiftclient.ClientException as e:
if e.http_status == httplib.NOT_FOUND:
msg = _("Swift could not find image at URI.")
raise exception.NotFound(msg)
else:
raise
def _create_container_if_missing(self, container, connection):
"""
Creates a missing container in Swift if the
``swift_store_create_container_on_put`` option is set.
:param container: Name of container to create
:param connection: Connection to swift service
"""
try:
connection.head_container(container)
except swiftclient.ClientException as e:
if e.http_status == httplib.NOT_FOUND:
if CONF.swift_store_create_container_on_put:
try:
connection.put_container(container)
except swiftclient.ClientException as e:
msg = (_("Failed to add container to Swift.\n"
"Got error from Swift: %(e)s") % {'e': e})
raise glance.store.BackendException(msg)
else:
msg = (_("The container %(container)s does not exist in "
"Swift. Please set the "
"swift_store_create_container_on_put option"
"to add container to Swift automatically.") %
{'container': container})
raise glance.store.BackendException(msg)
else:
raise
def get_connection(self):
raise NotImplemented()
def create_location(self):
raise NotImplemented()
class SingleTenantStore(BaseStore):
EXAMPLE_URL = "swift://<USER>:<KEY>@<AUTH_ADDRESS>/<CONTAINER>/<FILE>"
def configure(self):
super(SingleTenantStore, self).configure()
self.auth_version = self._option_get('swift_store_auth_version')
def configure_add(self):
self.auth_address = self._option_get('swift_store_auth_address')
if self.auth_address.startswith('http://'):
self.scheme = 'swift+http'
else:
self.scheme = 'swift+https'
self.container = CONF.swift_store_container
self.user = self._option_get('swift_store_user')
self.key = self._option_get('swift_store_key')
def create_location(self, image_id):
specs = {'scheme': self.scheme,
'container': self.container,
'obj': str(image_id),
'auth_or_store_url': self.auth_address,
'user': self.user,
'key': self.key}
return StoreLocation(specs)
def get_connection(self, location):
if not location.user:
reason = (_("Location is missing user:password information."))
LOG.debug(reason)
raise exception.BadStoreUri(message=reason)
auth_url = location.swift_url
if not auth_url.endswith('/'):
auth_url += '/'
if self.auth_version == '2':
try:
tenant_name, user = location.user.split(':')
except ValueError:
reason = (_("Badly formed tenant:user '%(user)s' in "
"Swift URI") % {'user': location.user})
LOG.debug(reason)
raise exception.BadStoreUri()
else:
tenant_name = None
user = location.user
os_options = {}
if self.region:
os_options['region_name'] = self.region
os_options['endpoint_type'] = self.endpoint_type
os_options['service_type'] = self.service_type
return swiftclient.Connection(
auth_url, user, location.key, insecure=self.insecure,
tenant_name=tenant_name, snet=self.snet,
auth_version=self.auth_version, os_options=os_options,
ssl_compression=self.ssl_compression)
class MultiTenantStore(BaseStore):
EXAMPLE_URL = "swift://<SWIFT_URL>/<CONTAINER>/<FILE>"
def configure_add(self):
self.container = CONF.swift_store_container
if self.context is None:
reason = _("Multi-tenant Swift storage requires a context.")
raise exception.BadStoreConfiguration(store_name="swift",
reason=reason)
if self.context.service_catalog is None:
reason = _("Multi-tenant Swift storage requires "
"a service catalog.")
raise exception.BadStoreConfiguration(store_name="swift",
reason=reason)
self.storage_url = auth.get_endpoint(
self.context.service_catalog, service_type=self.service_type,
endpoint_region=self.region, endpoint_type=self.endpoint_type)
if self.storage_url.startswith('http://'):
self.scheme = 'swift+http'
else:
self.scheme = 'swift+https'
def delete(self, location, connection=None):
if not connection:
connection = self.get_connection(location.store_location)
super(MultiTenantStore, self).delete(location, connection)
connection.delete_container(location.store_location.container)
def set_acls(self, location, public=False, read_tenants=None,
write_tenants=None, connection=None):
location = location.store_location
if not connection:
connection = self.get_connection(location)
if read_tenants is None:
read_tenants = []
if write_tenants is None:
write_tenants = []
headers = {}
if public:
headers['X-Container-Read'] = ".r:*,.rlistings"
elif read_tenants:
headers['X-Container-Read'] = ','.join('%s:*' % i
for i in read_tenants)
else:
headers['X-Container-Read'] = ''
write_tenants.extend(self.admin_tenants)
if write_tenants:
headers['X-Container-Write'] = ','.join('%s:*' % i
for i in write_tenants)
else:
headers['X-Container-Write'] = ''
try:
connection.post_container(location.container, headers=headers)
except swiftclient.ClientException as e:
if e.http_status == httplib.NOT_FOUND:
msg = _("Swift could not find image at URI.")
raise exception.NotFound(msg)
else:
raise
def create_location(self, image_id):
specs = {'scheme': self.scheme,
'container': self.container + '_' + str(image_id),
'obj': str(image_id),
'auth_or_store_url': self.storage_url}
return StoreLocation(specs)
def get_connection(self, location):
return swiftclient.Connection(
None, self.context.user, None,
preauthurl=location.swift_url,
preauthtoken=self.context.auth_tok,
tenant_name=self.context.tenant,
auth_version='2', snet=self.snet, insecure=self.insecure,
ssl_compression=self.ssl_compression)
class ChunkReader(object):
def __init__(self, fd, checksum, total):
self.fd = fd
self.checksum = checksum
self.total = total
self.bytes_read = 0
def read(self, i):
left = self.total - self.bytes_read
if i > left:
i = left
result = self.fd.read(i)
self.bytes_read += len(result)
self.checksum.update(result)
return result

33
setup.cfg Normal file
View File

@ -0,0 +1,33 @@
[metadata]
name = glance.store
version = 2014.1
summary = OpenStack Image Service Store Library
description-file =
README.rst
author = OpenStack
author-email = openstack-dev@lists.openstack.org
home-page = http://www.openstack.org/
classifier =
Environment :: OpenStack
Intended Audience :: Information Technology
Intended Audience :: System Administrators
License :: OSI Approved :: Apache Software License
Operating System :: POSIX :: Linux
Programming Language :: Python
Programming Language :: Python :: 2
Programming Language :: Python :: 2.7
Programming Language :: Python :: 2.6
[files]
packages =
glance
namespace_packages =
glance
[build_sphinx]
source-dir = doc/source
build-dir = doc/build
all_files = 1
[upload_sphinx]
upload-dir = doc/build/html

22
setup.py Normal file
View File

@ -0,0 +1,22 @@
#!/usr/bin/env python
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# THIS FILE IS MANAGED BY THE GLOBAL REQUIREMENTS REPO - DO NOT EDIT
import setuptools
setuptools.setup(
setup_requires=['pbr'],
pbr=True)