Config & Import fixes

This commit is contained in:
Flavio Percoco 2014-01-30 17:59:48 +01:00
parent 0ef71079f9
commit d6c5389bde
10 changed files with 47 additions and 1300 deletions

View File

@ -19,21 +19,23 @@ import sys
from oslo.config import cfg
#import glance.context
#import glance.domain.proxy
from glance.store.common import exception
from glance.store.common import utils
import glance.context
import glance.domain.proxy
from glance.openstack.common.gettextutils import _
from glance.openstack.common import importutils
import glance.openstack.common.log as logging
from glance.store import location
from glance.store import scrubber
from glance.store.openstack.common.gettextutils import _
from glance.store.openstack.common import importutils
from glance.store.openstack.common import log as logging
LOG = logging.getLogger(__name__)
_DEPRECATED_STORE_OPTS = [
cfg.DeprecatedOpt('known_stores'),
cfg.StrOpt('default_store'),
cfg.DeprecatedOpt('default_store')
]
_SCRUBBER_OPTS = [
cfg.StrOpt('scrubber_datadir',
default='/var/lib/glance/scrubber',
help=_('Directory that the scrubber will use to track '
@ -60,26 +62,21 @@ _STORE_OPTS = [
CONF = cfg.CONF
_STORE_CFG_GROUP = "glance_store"
CONF.register_opts(_DEPRECATED_STORE_OPTS)
CONF.register_opts(_STORE_OPTS, group=_STORE_CFG_GROUP)
def _oslo_config_options():
return itertools.chain(((opt, None) for opt in _DEPRECATED_STORE_OPTS),
return itertools.chain(((opt, None) for opt in _SCRUBBER_OPTS),
((opt, _STORE_CFG_GROUP) for opt in _STORE_OPTS))
class BackendException(Exception):
pass
class UnsupportedBackend(BackendException):
pass
def register_opts(conf):
for opt, group in _oslo_config_options():
conf.register_opt(opt, group=group)
class Indexable(object):
"""Indexable for file-like objs iterators
"""
Wrapper that allows an iterator or filelike be treated as an indexable
data structure. This is required in the case where the return value from
Store.get() is passed to Store.add() when adding a Copy-From image to a
@ -153,14 +150,15 @@ def _get_store_class(store_entry):
return store_cls
def create_stores():
def create_stores(conf=CONF):
"""
Registers all store modules and all schemes
from the given config. Duplicates are not re-registered.
"""
store_count = 0
store_classes = set()
for store_entry in CONF.known_stores:
for store_entry in conf.glance_store.stores:
store_entry = store_entry.strip()
if not store_entry:
continue
@ -295,6 +293,8 @@ def safe_delete_from_backend(context, uri, image_id, **kwargs):
def schedule_delayed_delete_from_backend(context, uri, image_id, **kwargs):
"""Given a uri, schedule the deletion of an image location."""
# FIXME(flaper87): Remove this function
from glance.store import scrubber
(file_queue, _db_queue) = scrubber.get_scrub_queues()
# NOTE(zhiyan): Defautly ask glance-api store using file based queue.
# In future we can change it using DB based queued instead,
@ -377,38 +377,6 @@ def set_acls(context, location_uri, public=False, read_tenants=[],
except NotImplementedError:
LOG.debug(_("Skipping store.set_acls... not implemented."))
class ImageRepoProxy(glance.domain.proxy.Repo):
def __init__(self, image_repo, context, store_api):
self.context = context
self.store_api = store_api
proxy_kwargs = {'context': context, 'store_api': store_api}
super(ImageRepoProxy, self).__init__(image_repo,
item_proxy_class=ImageProxy,
item_proxy_kwargs=proxy_kwargs)
def _set_acls(self, image):
public = image.visibility == 'public'
member_ids = []
if image.locations and not public:
member_repo = image.get_member_repo()
member_ids = [m.member_id for m in member_repo.list()]
for location in image.locations:
self.store_api.set_acls(self.context, location['url'], public,
read_tenants=member_ids)
def add(self, image):
result = super(ImageRepoProxy, self).add(image)
self._set_acls(image)
return result
def save(self, image):
result = super(ImageRepoProxy, self).save(image)
self._set_acls(image)
return result
def _check_location_uri(context, store_api, uri):
"""
Check if an image location uri is valid.
@ -442,280 +410,3 @@ def _set_image_size(context, image, locations):
# NOTE(flwang): This assumes all locations have the same size
image.size = size_from_backend
break
class ImageFactoryProxy(glance.domain.proxy.ImageFactory):
def __init__(self, factory, context, store_api):
self.context = context
self.store_api = store_api
proxy_kwargs = {'context': context, 'store_api': store_api}
super(ImageFactoryProxy, self).__init__(factory,
proxy_class=ImageProxy,
proxy_kwargs=proxy_kwargs)
def new_image(self, **kwargs):
locations = kwargs.get('locations', [])
for l in locations:
_check_image_location(self.context, self.store_api, l)
if locations.count(l) > 1:
raise exception.DuplicateLocation(location=l['url'])
return super(ImageFactoryProxy, self).new_image(**kwargs)
class StoreLocations(collections.MutableSequence):
"""
The proxy for store location property. It takes responsibility for:
1. Location uri correctness checking when adding a new location.
2. Remove the image data from the store when a location is removed
from an image.
"""
def __init__(self, image_proxy, value):
self.image_proxy = image_proxy
if isinstance(value, list):
self.value = value
else:
self.value = list(value)
def append(self, location):
# NOTE(flaper87): Insert this
# location at the very end of
# the value list.
self.insert(len(self.value), location)
def extend(self, other):
if isinstance(other, StoreLocations):
locations = other.value
else:
locations = list(other)
for location in locations:
self.append(location)
def insert(self, i, location):
_check_image_location(self.image_proxy.context,
self.image_proxy.store_api, location)
if location in self.value:
raise exception.DuplicateLocation(location=location['url'])
self.value.insert(i, location)
_set_image_size(self.image_proxy.context,
self.image_proxy,
[location])
def pop(self, i=-1):
location = self.value.pop(i)
try:
delete_image_from_backend(self.image_proxy.context,
self.image_proxy.store_api,
self.image_proxy.image.image_id,
location['url'])
except Exception:
self.value.insert(i, location)
raise
return location
def count(self, location):
return self.value.count(location)
def index(self, location, *args):
return self.value.index(location, *args)
def remove(self, location):
if self.count(location):
self.pop(self.index(location))
else:
self.value.remove(location)
def reverse(self):
self.value.reverse()
# Mutable sequence, so not hashable
__hash__ = None
def __getitem__(self, i):
return self.value.__getitem__(i)
def __setitem__(self, i, location):
_check_image_location(self.image_proxy.context,
self.image_proxy.store_api, location)
self.value.__setitem__(i, location)
_set_image_size(self.image_proxy.context,
self.image_proxy,
[location])
def __delitem__(self, i):
location = None
try:
location = self.value.__getitem__(i)
except Exception:
return self.value.__delitem__(i)
delete_image_from_backend(self.image_proxy.context,
self.image_proxy.store_api,
self.image_proxy.image.image_id,
location['url'])
self.value.__delitem__(i)
def __delslice__(self, i, j):
i = max(i, 0)
j = max(j, 0)
locations = []
try:
locations = self.value.__getslice__(i, j)
except Exception:
return self.value.__delslice__(i, j)
for location in locations:
delete_image_from_backend(self.image_proxy.context,
self.image_proxy.store_api,
self.image_proxy.image.image_id,
location['url'])
self.value.__delitem__(i)
def __iadd__(self, other):
self.extend(other)
return self
def __contains__(self, location):
return location in self.value
def __len__(self):
return len(self.value)
def __cast(self, other):
if isinstance(other, StoreLocations):
return other.value
else:
return other
def __cmp__(self, other):
return cmp(self.value, self.__cast(other))
def __iter__(self):
return iter(self.value)
def _locations_proxy(target, attr):
"""
Make a location property proxy on the image object.
:param target: the image object on which to add the proxy
:param attr: the property proxy we want to hook
"""
def get_attr(self):
value = getattr(getattr(self, target), attr)
return StoreLocations(self, value)
def set_attr(self, value):
if not isinstance(value, (list, StoreLocations)):
raise exception.BadStoreUri(_('Invalid locations: %s') % value)
ori_value = getattr(getattr(self, target), attr)
if ori_value != value:
# NOTE(zhiyan): Enforced locations list was previously empty list.
if len(ori_value) > 0:
raise exception.Invalid(_('Original locations is not empty: '
'%s') % ori_value)
# NOTE(zhiyan): Check locations are all valid.
for location in value:
_check_image_location(self.context, self.store_api,
location)
if value.count(location) > 1:
raise exception.DuplicateLocation(location=location['url'])
_set_image_size(self.context, getattr(self, target), value)
return setattr(getattr(self, target), attr, list(value))
def del_attr(self):
value = getattr(getattr(self, target), attr)
while len(value):
delete_image_from_backend(self.context, self.store_api,
self.image.image_id, value[0]['url'])
del value[0]
setattr(getattr(self, target), attr, value)
return delattr(getattr(self, target), attr)
return property(get_attr, set_attr, del_attr)
class ImageProxy(glance.domain.proxy.Image):
locations = _locations_proxy('image', 'locations')
def __init__(self, image, context, store_api):
self.image = image
self.context = context
self.store_api = store_api
proxy_kwargs = {
'context': context,
'image': self,
'store_api': store_api,
}
super(ImageProxy, self).__init__(
image, member_repo_proxy_class=ImageMemberRepoProxy,
member_repo_proxy_kwargs=proxy_kwargs)
def delete(self):
self.image.delete()
if self.image.locations:
for location in self.image.locations:
self.store_api.delete_image_from_backend(self.context,
self.store_api,
self.image.image_id,
location['url'])
def set_data(self, data, size=None):
if size is None:
size = 0 # NOTE(markwash): zero -> unknown size
location, size, checksum, loc_meta = self.store_api.add_to_backend(
self.context, CONF.default_store,
self.image.image_id, utils.CooperativeReader(data), size)
self.image.locations = [{'url': location, 'metadata': loc_meta}]
self.image.size = size
self.image.checksum = checksum
self.image.status = 'active'
def get_data(self):
if not self.image.locations:
raise exception.NotFound(_("No image data could be found"))
err = None
for loc in self.image.locations:
try:
data, size = self.store_api.get_from_backend(self.context,
loc['url'])
return data
except Exception as e:
LOG.warn(_('Get image %(id)s data from %(loc)s '
'failed: %(err)s.') % {'id': self.image.image_id,
'loc': loc, 'err': e})
err = e
# tried all locations
LOG.error(_('Glance tried all locations to get data for image %s '
'but all have failed.') % self.image.image_id)
raise err
class ImageMemberRepoProxy(glance.domain.proxy.Repo):
def __init__(self, repo, image, context, store_api):
self.repo = repo
self.image = image
self.context = context
self.store_api = store_api
super(ImageMemberRepoProxy, self).__init__(repo)
def _set_acls(self):
public = self.image.visibility == 'public'
if self.image.locations and not public:
member_ids = [m.member_id for m in self.repo.list()]
for location in self.image.locations:
self.store_api.set_acls(self.context, location['url'], public,
read_tenants=member_ids)
def add(self, member):
super(ImageMemberRepoProxy, self).add(member)
self._set_acls()
def remove(self, member):
super(ImageMemberRepoProxy, self).remove(member)
self._set_acls()

View File

@ -16,11 +16,10 @@
"""Base class for all storage backends"""
from glance.common import exception
from glance.openstack.common import importutils
import glance.openstack.common.log as logging
from glance.openstack.common import strutils
from glance.openstack.common import units
from glance.store.common import exception
from glance.store.openstack.common.gettextutils import _
from glance.store.openstack.common import importutils
import glance.store.openstack.common.log as logging
LOG = logging.getLogger(__name__)
@ -39,7 +38,7 @@ def _exception_to_unicode(exc):
class Store(object):
CHUNKSIZE = 16 * units.Mi # 16M
CHUNKSIZE = 16 * (1024 * 1024) # 16M
def __init__(self, context=None, location=None):
"""

View File

@ -15,6 +15,16 @@
"""Glance Store exception subclasses"""
from glance.store.openstack.common.gettextutils import _
class BackendException(Exception):
pass
class UnsupportedBackend(BackendException):
pass
class GlanceStoreException(Exception):
"""

View File

@ -18,49 +18,17 @@
System-level utilities and helper functions.
"""
import errno
try:
from eventlet import sleep
except ImportError:
from time import sleep
from eventlet.green import socket
import functools
import os
import platform
import subprocess
import sys
import uuid
from OpenSSL import crypto
from oslo.config import cfg
from webob import exc
from glance.store.openstack.common import log as logging
from glance.common import exception
import glance.openstack.common.log as logging
from glance.openstack.common import strutils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
FEATURE_BLACKLIST = ['content-length', 'content-type', 'x-image-meta-size']
# Whitelist of v1 API headers of form x-image-meta-xxx
IMAGE_META_HEADERS = ['x-image-meta-location', 'x-image-meta-size',
'x-image-meta-is_public', 'x-image-meta-disk_format',
'x-image-meta-container_format', 'x-image-meta-name',
'x-image-meta-status', 'x-image-meta-copy_from',
'x-image-meta-uri', 'x-image-meta-checksum',
'x-image-meta-created_at', 'x-image-meta-updated_at',
'x-image-meta-deleted_at', 'x-image-meta-min_ram',
'x-image-meta-min_disk', 'x-image-meta-owner',
'x-image-meta-store', 'x-image-meta-id',
'x-image-meta-protected', 'x-image-meta-deleted']
GLANCE_TEST_SOCKET_FD_STR = 'GLANCE_TEST_SOCKET_FD'
def chunkreadable(iter, chunk_size=65536):
"""
@ -156,404 +124,3 @@ class CooperativeReader(object):
def __iter__(self):
return cooperative_iter(self.fd.__iter__())
class LimitingReader(object):
"""
Reader designed to fail when reading image data past the configured
allowable amount.
"""
def __init__(self, data, limit):
"""
:param data: Underlying image data object
:param limit: maximum number of bytes the reader should allow
"""
self.data = data
self.limit = limit
self.bytes_read = 0
def __iter__(self):
for chunk in self.data:
self.bytes_read += len(chunk)
if self.bytes_read > self.limit:
raise exception.ImageSizeLimitExceeded()
else:
yield chunk
def read(self, i):
result = self.data.read(i)
self.bytes_read += len(result)
if self.bytes_read > self.limit:
raise exception.ImageSizeLimitExceeded()
return result
def image_meta_to_http_headers(image_meta):
"""
Returns a set of image metadata into a dict
of HTTP headers that can be fed to either a Webob
Request object or an httplib.HTTP(S)Connection object
:param image_meta: Mapping of image metadata
"""
headers = {}
for k, v in image_meta.items():
if v is not None:
if k == 'properties':
for pk, pv in v.items():
if pv is not None:
headers["x-image-meta-property-%s"
% pk.lower()] = unicode(pv)
else:
headers["x-image-meta-%s" % k.lower()] = unicode(v)
return headers
def add_features_to_http_headers(features, headers):
"""
Adds additional headers representing glance features to be enabled.
:param headers: Base set of headers
:param features: Map of enabled features
"""
if features:
for k, v in features.items():
if k.lower() in FEATURE_BLACKLIST:
raise exception.UnsupportedHeaderFeature(feature=k)
if v is not None:
headers[k.lower()] = unicode(v)
def get_image_meta_from_headers(response):
"""
Processes HTTP headers from a supplied response that
match the x-image-meta and x-image-meta-property and
returns a mapping of image metadata and properties
:param response: Response to process
"""
result = {}
properties = {}
if hasattr(response, 'getheaders'): # httplib.HTTPResponse
headers = response.getheaders()
else: # webob.Response
headers = response.headers.items()
for key, value in headers:
key = str(key.lower())
if key.startswith('x-image-meta-property-'):
field_name = key[len('x-image-meta-property-'):].replace('-', '_')
properties[field_name] = value or None
elif key.startswith('x-image-meta-'):
field_name = key[len('x-image-meta-'):].replace('-', '_')
if 'x-image-meta-' + field_name not in IMAGE_META_HEADERS:
msg = _("Bad header: %(header_name)s") % {'header_name': key}
raise exc.HTTPBadRequest(msg, content_type="text/plain")
result[field_name] = value or None
result['properties'] = properties
for key in ('size', 'min_disk', 'min_ram'):
if key in result:
try:
result[key] = int(result[key])
except ValueError:
extra = (_("Cannot convert image %(key)s '%(value)s' "
"to an integer.")
% {'key': key, 'value': result[key]})
raise exception.InvalidParameterValue(value=result[key],
param=key,
extra_msg=extra)
if result[key] < 0:
extra = (_("Image %(key)s must be >= 0 "
"('%(value)s' specified).")
% {'key': key, 'value': result[key]})
raise exception.InvalidParameterValue(value=result[key],
param=key,
extra_msg=extra)
for key in ('is_public', 'deleted', 'protected'):
if key in result:
result[key] = strutils.bool_from_string(result[key])
return result
def safe_mkdirs(path):
try:
os.makedirs(path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
def safe_remove(path):
try:
os.remove(path)
except OSError as e:
if e.errno != errno.ENOENT:
raise
class PrettyTable(object):
"""Creates an ASCII art table for use in bin/glance
Example:
ID Name Size Hits
--- ----------------- ------------ -----
122 image 22 0
"""
def __init__(self):
self.columns = []
def add_column(self, width, label="", just='l'):
"""Add a column to the table
:param width: number of characters wide the column should be
:param label: column heading
:param just: justification for the column, 'l' for left,
'r' for right
"""
self.columns.append((width, label, just))
def make_header(self):
label_parts = []
break_parts = []
for width, label, _ in self.columns:
# NOTE(sirp): headers are always left justified
label_part = self._clip_and_justify(label, width, 'l')
label_parts.append(label_part)
break_part = '-' * width
break_parts.append(break_part)
label_line = ' '.join(label_parts)
break_line = ' '.join(break_parts)
return '\n'.join([label_line, break_line])
def make_row(self, *args):
row = args
row_parts = []
for data, (width, _, just) in zip(row, self.columns):
row_part = self._clip_and_justify(data, width, just)
row_parts.append(row_part)
row_line = ' '.join(row_parts)
return row_line
@staticmethod
def _clip_and_justify(data, width, just):
# clip field to column width
clipped_data = str(data)[:width]
if just == 'r':
# right justify
justified = clipped_data.rjust(width)
else:
# left justify
justified = clipped_data.ljust(width)
return justified
def get_terminal_size():
def _get_terminal_size_posix():
import fcntl
import struct
import termios
height_width = None
try:
height_width = struct.unpack('hh', fcntl.ioctl(sys.stderr.fileno(),
termios.TIOCGWINSZ,
struct.pack('HH', 0, 0)))
except Exception:
pass
if not height_width:
try:
p = subprocess.Popen(['stty', 'size'],
shell=False,
stdout=subprocess.PIPE,
stderr=open(os.devnull, 'w'))
result = p.communicate()
if p.returncode == 0:
return tuple(int(x) for x in result[0].split())
except Exception:
pass
return height_width
def _get_terminal_size_win32():
try:
from ctypes import windll, create_string_buffer
handle = windll.kernel32.GetStdHandle(-12)
csbi = create_string_buffer(22)
res = windll.kernel32.GetConsoleScreenBufferInfo(handle, csbi)
except Exception:
return None
if res:
import struct
unpack_tmp = struct.unpack("hhhhHhhhhhh", csbi.raw)
(bufx, bufy, curx, cury, wattr,
left, top, right, bottom, maxx, maxy) = unpack_tmp
height = bottom - top + 1
width = right - left + 1
return (height, width)
else:
return None
def _get_terminal_size_unknownOS():
raise NotImplementedError
func = {'posix': _get_terminal_size_posix,
'win32': _get_terminal_size_win32}
height_width = func.get(platform.os.name, _get_terminal_size_unknownOS)()
if height_width is None:
raise exception.Invalid()
for i in height_width:
if not isinstance(i, int) or i <= 0:
raise exception.Invalid()
return height_width[0], height_width[1]
def mutating(func):
"""Decorator to enforce read-only logic"""
@functools.wraps(func)
def wrapped(self, req, *args, **kwargs):
if req.context.read_only:
msg = _("Read-only access")
LOG.debug(msg)
raise exc.HTTPForbidden(msg, request=req,
content_type="text/plain")
return func(self, req, *args, **kwargs)
return wrapped
def setup_remote_pydev_debug(host, port):
error_msg = ('Error setting up the debug environment. Verify that the'
' option pydev_worker_debug_port is pointing to a valid '
'hostname or IP on which a pydev server is listening on'
' the port indicated by pydev_worker_debug_port.')
try:
try:
from pydev import pydevd
except ImportError:
import pydevd
pydevd.settrace(host,
port=port,
stdoutToServer=True,
stderrToServer=True)
return True
except Exception:
LOG.exception(error_msg)
raise
class LazyPluggable(object):
"""A pluggable backend loaded lazily based on some value."""
def __init__(self, pivot, config_group=None, **backends):
self.__backends = backends
self.__pivot = pivot
self.__backend = None
self.__config_group = config_group
def __get_backend(self):
if not self.__backend:
if self.__config_group is None:
backend_name = CONF[self.__pivot]
else:
backend_name = CONF[self.__config_group][self.__pivot]
if backend_name not in self.__backends:
msg = _('Invalid backend: %s') % backend_name
raise exception.GlanceException(msg)
backend = self.__backends[backend_name]
if isinstance(backend, tuple):
name = backend[0]
fromlist = backend[1]
else:
name = backend
fromlist = backend
self.__backend = __import__(name, None, None, fromlist)
return self.__backend
def __getattr__(self, key):
backend = self.__get_backend()
return getattr(backend, key)
def validate_key_cert(key_file, cert_file):
try:
error_key_name = "private key"
error_filename = key_file
key_str = open(key_file, "r").read()
key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_str)
error_key_name = "certficate"
error_filename = cert_file
cert_str = open(cert_file, "r").read()
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str)
except IOError as ioe:
raise RuntimeError(_("There is a problem with your %(error_key_name)s "
"%(error_filename)s. Please verify it."
" Error: %(ioe)s") %
{'error_key_name': error_key_name,
'error_filename': error_filename,
'ioe': ioe})
except crypto.Error as ce:
raise RuntimeError(_("There is a problem with your %(error_key_name)s "
"%(error_filename)s. Please verify it. OpenSSL"
" error: %(ce)s") %
{'error_key_name': error_key_name,
'error_filename': error_filename,
'ce': ce})
try:
data = str(uuid.uuid4())
digest = "sha1"
out = crypto.sign(key, data, digest)
crypto.verify(cert, out, data, digest)
except crypto.Error as ce:
raise RuntimeError(_("There is a problem with your key pair. "
"Please verify that cert %(cert_file)s and "
"key %(key_file)s belong together. OpenSSL "
"error %(ce)s") % {'cert_file': cert_file,
'key_file': key_file,
'ce': ce})
def get_test_suite_socket():
global GLANCE_TEST_SOCKET_FD_STR
if GLANCE_TEST_SOCKET_FD_STR in os.environ:
fd = int(os.environ[GLANCE_TEST_SOCKET_FD_STR])
sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
sock = socket.SocketType(_sock=sock)
sock.listen(CONF.backlog)
del os.environ[GLANCE_TEST_SOCKET_FD_STR]
os.close(fd)
return sock
return None
def is_uuid_like(val):
"""Returns validation of a value as a UUID.
For our purposes, a UUID is a canonical form string:
aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa
"""
try:
return str(uuid.UUID(val)) == val
except (TypeError, ValueError, AttributeError):
return False

View File

@ -24,13 +24,14 @@ import urlparse
from oslo.config import cfg
from glance.store.common import exception
from glance.store.common import utils
from glance.openstack.common import jsonutils
import glance.openstack.common.log as logging
import glance.store
import glance.store.base
from glance.store.common import exception
from glance.store.common import utils
import glance.store.location
from glance.store.openstack.common.gettextutils import _
from glance.store.openstack.common import jsonutils
import glance.store.openstack.common.log as logging
LOG = logging.getLogger(__name__)

View File

@ -39,8 +39,8 @@ credentials and is **not** user-facing.
import urlparse
from glance.common import exception
import glance.openstack.common.log as logging
from glance.store.common import exception
from glance.store.openstack.common import log as logging
LOG = logging.getLogger(__name__)

View File

@ -1,523 +0,0 @@
# Copyright 2010 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import calendar
import eventlet
import os
import time
from oslo.config import cfg
from glance.common import crypt
from glance.common import exception
from glance.common import utils
from glance import context
from glance.openstack.common import lockutils
import glance.openstack.common.log as logging
import glance.registry.client.v1.api as registry
LOG = logging.getLogger(__name__)
scrubber_opts = [
cfg.StrOpt('scrubber_datadir',
default='/var/lib/glance/scrubber',
help=_('Directory that the scrubber will use to track '
'information about what to delete. '
'Make sure this is set in glance-api.conf and '
'glance-scrubber.conf')),
cfg.IntOpt('scrub_time', default=0,
help=_('The amount of time in seconds to delay before '
'performing a delete.')),
cfg.BoolOpt('cleanup_scrubber', default=False,
help=_('A boolean that determines if the scrubber should '
'clean up the files it uses for taking data. Only '
'one server in your deployment should be designated '
'the cleanup host.')),
cfg.IntOpt('cleanup_scrubber_time', default=86400,
help=_('Items must have a modified time that is older than '
'this value in order to be candidates for cleanup.'))
]
CONF = cfg.CONF
CONF.register_opts(scrubber_opts)
CONF.import_opt('metadata_encryption_key', 'glance.common.config')
class ScrubQueue(object):
"""Image scrub queue base class.
The queue contains image's location which need to delete from backend.
"""
def __init__(self):
registry.configure_registry_client()
registry.configure_registry_admin_creds()
self.registry = registry.get_registry_client(context.RequestContext())
@abc.abstractmethod
def add_location(self, image_id, uri):
"""Adding image location to scrub queue.
:param image_id: The opaque image identifier
:param uri: The opaque image location uri
"""
pass
@abc.abstractmethod
def get_all_locations(self):
"""Returns a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
pass
@abc.abstractmethod
def pop_all_locations(self):
"""Pop out a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
pass
@abc.abstractmethod
def has_image(self, image_id):
"""Returns whether the queue contains an image or not.
:param image_id: The opaque image identifier
:retval a boolean value to inform including or not
"""
pass
class ScrubFileQueue(ScrubQueue):
"""File-based image scrub queue class."""
def __init__(self):
super(ScrubFileQueue, self).__init__()
self.scrubber_datadir = CONF.scrubber_datadir
utils.safe_mkdirs(self.scrubber_datadir)
self.scrub_time = CONF.scrub_time
self.metadata_encryption_key = CONF.metadata_encryption_key
def _read_queue_file(self, file_path):
"""Reading queue file to loading deleted location and timestamp out.
:param file_path: Queue file full path
:retval a list of image location timestamp tuple from queue file
"""
uris = []
delete_times = []
try:
with open(file_path, 'r') as f:
while True:
uri = f.readline().strip()
if uri:
uris.append(uri)
delete_times.append(int(f.readline().strip()))
else:
break
except Exception:
LOG.error(_("%s file can not be read.") % file_path)
return uris, delete_times
def _update_queue_file(self, file_path, remove_record_idxs):
"""Updating queue file to remove such queue records.
:param file_path: Queue file full path
:param remove_record_idxs: A list of record index those want to remove
"""
try:
with open(file_path, 'r') as f:
lines = f.readlines()
# NOTE(zhiyan) we need bottom up removing to
# keep record index be valid.
remove_record_idxs.sort(reverse=True)
for record_idx in remove_record_idxs:
# Each record has two lines
line_no = (record_idx + 1) * 2 - 1
del lines[line_no:line_no + 2]
with open(file_path, 'w') as f:
f.write(''.join(lines))
os.chmod(file_path, 0o600)
except Exception:
LOG.error(_("%s file can not be wrote.") % file_path)
def add_location(self, image_id, uri):
"""Adding image location to scrub queue.
:param image_id: The opaque image identifier
:param uri: The opaque image location uri
"""
with lockutils.lock("scrubber-%s" % image_id,
lock_file_prefix='glance-', external=True):
# NOTE(zhiyan): make sure scrubber does not cleanup
# 'pending_delete' images concurrently before the code
# get lock and reach here.
try:
image = self.registry.get_image(image_id)
if image['status'] == 'deleted':
return
except exception.NotFound as e:
LOG.error(_("Failed to find image to delete: "
"%(e)s"), {'e': e})
return
delete_time = time.time() + self.scrub_time
file_path = os.path.join(self.scrubber_datadir, str(image_id))
if self.metadata_encryption_key is not None:
uri = crypt.urlsafe_encrypt(self.metadata_encryption_key,
uri, 64)
if os.path.exists(file_path):
# Append the uri of location to the queue file
with open(file_path, 'a') as f:
f.write('\n')
f.write('\n'.join([uri, str(int(delete_time))]))
else:
# NOTE(zhiyan): Protect the file before we write any data.
open(file_path, 'w').close()
os.chmod(file_path, 0o600)
with open(file_path, 'w') as f:
f.write('\n'.join([uri, str(int(delete_time))]))
os.utime(file_path, (delete_time, delete_time))
def _walk_all_locations(self, remove=False):
"""Returns a list of image id and location tuple from scrub queue.
:param remove: Whether remove location from queue or not after walk
:retval a list of image image_id and location tuple from scrub queue
"""
if not os.path.exists(self.scrubber_datadir):
LOG.info(_("%s directory does not exist.") % self.scrubber_datadir)
return []
ret = []
for root, dirs, files in os.walk(self.scrubber_datadir):
for image_id in files:
if not utils.is_uuid_like(image_id):
continue
with lockutils.lock("scrubber-%s" % image_id,
lock_file_prefix='glance-', external=True):
file_path = os.path.join(self.scrubber_datadir, image_id)
uris, delete_times = self._read_queue_file(file_path)
remove_record_idxs = []
skipped = False
for (record_idx, delete_time) in enumerate(delete_times):
if delete_time > time.time():
skipped = True
continue
else:
ret.append((image_id, uris[record_idx]))
remove_record_idxs.append(record_idx)
if remove:
if skipped:
# NOTE(zhiyan): remove location records from
# the queue file.
self._update_queue_file(file_path,
remove_record_idxs)
else:
utils.safe_remove(file_path)
return ret
def get_all_locations(self):
"""Returns a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
return self._walk_all_locations()
def pop_all_locations(self):
"""Pop out a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
return self._walk_all_locations(remove=True)
def has_image(self, image_id):
"""Returns whether the queue contains an image or not.
:param image_id: The opaque image identifier
:retval a boolean value to inform including or not
"""
return os.path.exists(os.path.join(self.scrubber_datadir,
str(image_id)))
class ScrubDBQueue(ScrubQueue):
"""Database-based image scrub queue class."""
def __init__(self):
super(ScrubDBQueue, self).__init__()
self.cleanup_scrubber_time = CONF.cleanup_scrubber_time
def add_location(self, image_id, uri):
"""Adding image location to scrub queue.
:param image_id: The opaque image identifier
:param uri: The opaque image location uri
"""
raise NotImplementedError
def _walk_all_locations(self, remove=False):
"""Returns a list of image id and location tuple from scrub queue.
:param remove: Whether remove location from queue or not after walk
:retval a list of image id and location tuple from scrub queue
"""
filters = {'deleted': True,
'is_public': 'none',
'status': 'pending_delete'}
ret = []
for image in self.registry.get_images_detailed(filters=filters):
deleted_at = image.get('deleted_at')
if not deleted_at:
continue
# NOTE: Strip off microseconds which may occur after the last '.,'
# Example: 2012-07-07T19:14:34.974216
date_str = deleted_at.rsplit('.', 1)[0].rsplit(',', 1)[0]
delete_time = calendar.timegm(time.strptime(date_str,
"%Y-%m-%dT%H:%M:%S"))
if delete_time + self.cleanup_scrubber_time > time.time():
continue
ret.extend([(image['id'], location['uri'])
for location in image['location_data']])
if remove:
self.registry.update_image(image['id'], {'status': 'deleted'})
return ret
def get_all_locations(self):
"""Returns a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
return self._walk_all_locations()
def pop_all_locations(self):
"""Pop out a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
return self._walk_all_locations(remove=True)
def has_image(self, image_id):
"""Returns whether the queue contains an image or not.
:param image_id: The opaque image identifier
:retval a boolean value to inform including or not
"""
try:
image = self.registry.get_image(image_id)
return image['status'] == 'pending_delete'
except exception.NotFound as e:
return False
_file_queue = None
_db_queue = None
def get_scrub_queues():
global _file_queue, _db_queue
if not _file_queue:
_file_queue = ScrubFileQueue()
if not _db_queue:
_db_queue = ScrubDBQueue()
return (_file_queue, _db_queue)
class Daemon(object):
def __init__(self, wakeup_time=300, threads=1000):
LOG.info(_("Starting Daemon: wakeup_time=%(wakeup_time)s "
"threads=%(threads)s"),
{'wakeup_time': wakeup_time, 'threads': threads})
self.wakeup_time = wakeup_time
self.event = eventlet.event.Event()
self.pool = eventlet.greenpool.GreenPool(threads)
def start(self, application):
self._run(application)
def wait(self):
try:
self.event.wait()
except KeyboardInterrupt:
msg = _("Daemon Shutdown on KeyboardInterrupt")
LOG.info(msg)
def _run(self, application):
LOG.debug(_("Running application"))
self.pool.spawn_n(application.run, self.pool, self.event)
eventlet.spawn_after(self.wakeup_time, self._run, application)
LOG.debug(_("Next run scheduled in %s seconds") % self.wakeup_time)
class Scrubber(object):
def __init__(self, store_api):
LOG.info(_("Initializing scrubber with configuration: %s") %
unicode({'scrubber_datadir': CONF.scrubber_datadir,
'cleanup': CONF.cleanup_scrubber,
'cleanup_time': CONF.cleanup_scrubber_time,
'registry_host': CONF.registry_host,
'registry_port': CONF.registry_port}))
utils.safe_mkdirs(CONF.scrubber_datadir)
self.store_api = store_api
registry.configure_registry_client()
registry.configure_registry_admin_creds()
self.registry = registry.get_registry_client(context.RequestContext())
(self.file_queue, self.db_queue) = get_scrub_queues()
def _get_delete_jobs(self, queue, pop):
try:
if pop:
image_id_uri_list = queue.pop_all_locations()
else:
image_id_uri_list = queue.get_all_locations()
except Exception:
LOG.error(_("Can not %s scrub jobs from queue.") %
'pop' if pop else 'get')
return None
delete_jobs = {}
for image_id, image_uri in image_id_uri_list:
if image_id not in delete_jobs:
delete_jobs[image_id] = []
delete_jobs[image_id].append((image_id, image_uri))
return delete_jobs
def run(self, pool, event=None):
delete_jobs = self._get_delete_jobs(self.file_queue, True)
if delete_jobs:
for image_id, jobs in delete_jobs.iteritems():
self._scrub_image(pool, image_id, jobs)
if CONF.cleanup_scrubber:
self._cleanup(pool)
def _scrub_image(self, pool, image_id, delete_jobs):
if len(delete_jobs) == 0:
return
LOG.info(_("Scrubbing image %(id)s from %(count)d locations.") %
{'id': image_id, 'count': len(delete_jobs)})
# NOTE(bourke): The starmap must be iterated to do work
list(pool.starmap(self._delete_image_from_backend, delete_jobs))
image = self.registry.get_image(image_id)
if (image['status'] == 'pending_delete' and
not self.file_queue.has_image(image_id)):
self.registry.update_image(image_id, {'status': 'deleted'})
def _delete_image_from_backend(self, image_id, uri):
if CONF.metadata_encryption_key is not None:
uri = crypt.urlsafe_decrypt(CONF.metadata_encryption_key, uri)
try:
LOG.debug(_("Deleting URI from image %(image_id)s.") %
{'image_id': image_id})
# Here we create a request context with credentials to support
# delayed delete when using multi-tenant backend storage
admin_tenant = CONF.admin_tenant_name
auth_token = self.registry.auth_tok
admin_context = context.RequestContext(user=CONF.admin_user,
tenant=admin_tenant,
auth_tok=auth_token)
self.store_api.delete_from_backend(admin_context, uri)
except Exception:
msg = _("Failed to delete URI from image %(image_id)s")
LOG.error(msg % {'image_id': image_id})
def _read_cleanup_file(self, file_path):
"""Reading cleanup to get latest cleanup timestamp.
:param file_path: Cleanup status file full path
:retval latest cleanup timestamp
"""
try:
if not os.path.exists(file_path):
msg = _("%s file is not exists.") % unicode(file_path)
raise Exception(msg)
atime = int(os.path.getatime(file_path))
mtime = int(os.path.getmtime(file_path))
if atime != mtime:
msg = _("%s file contains conflicting cleanup "
"timestamp.") % unicode