Adds Driver Layer to Image Cache

Fixes LP Bug#879136 - keyerror: 'image' when doing nova image-list
Fixes LP Bug#819936 - New image cache breaks Glance on Windows

This patch refactors the image cache further by adding an
adaptable driver layer to the cache. The existing filesystem-based
driver that depended on python-xattr and conditional fstab support
has been moved to /glance/image_cache/drivers/xattr.py, and a new
default driver is now based on SQLite and has no special requirements.

The image cache now contains a simple interface for pruning the
cache. Instead of the logic being contained in
/glance/image_cache/pruner.py, now the prune logic is self-contained
within the ImageCache.prune() method, with pruning calling the
simple well-defined driver methods of get_least_recently_accessed()
and get_cache_size().

Adds a functional test case for the caching middleware and adds
documentation on how to configure the image cache drivers.

TODO: cache-manage middleware...
TODO: cache management docs

Change-Id: Id7ae73549d6bb39222eb7ac0427b0083fd1af3ec
This commit is contained in:
Jay Pipes 2011-10-21 16:10:32 -04:00
parent d521d6529e
commit 39c8557434
26 changed files with 1906 additions and 1145 deletions

View File

@ -19,16 +19,16 @@
# under the License.
"""
Glance Image Cache Invalid Cache Entry and Stalled Image Reaper
Glance Image Cache Invalid Cache Entry and Stalled Image cleaner
This is meant to be run as a periodic task from cron.
If something goes wrong while we're caching an image (for example the fetch
times out, or an exception is raised), we create an 'invalid' entry. These
entires are left around for debugging purposes. However, after some period of
time, we want to cleans these up, aka reap them.
time, we want to clean these up.
Also, if an incomplete image hangs around past the image_cache_stall_timeout
Also, if an incomplete image hangs around past the image_cache_stall_time
period, we automatically sweep it up.
"""
@ -70,7 +70,7 @@ if __name__ == '__main__':
(options, args) = config.parse_options(oparser)
try:
conf, app = config.load_paste_app('glance-reaper', options, args)
conf, app = config.load_paste_app('glance-cleaner', options, args)
app.run()
except RuntimeError, e:
sys.exit("ERROR: %s" % e)

View File

@ -507,15 +507,43 @@ Configuration Options Affecting the Image Cache
One main configuration file option affects the image cache.
* ``image_cache_datadir=PATH``
* ``image_cache_dir=PATH``
Required when image cache middleware is enabled.
Default: ``/var/lib/glance/image-cache``
This is the root directory where the image cache will write its
cached image files. Make sure the directory is writeable by the
user running the ``glance-api`` server
This is the base directory the image cache can write files to.
Make sure the directory is writeable by the user running the
``glance-api`` server
* ``image_cache_driver=DRIVER``
Optional. Choice of ``sqlite`` or ``xattr``
Default: ``sqlite``
The default ``sqlite`` cache driver has no special dependencies, other
than the ``python-sqlite3`` library, which is installed on virtually
all operating systems with modern versions of Python. It stores
information about the cached files in a SQLite database.
The ``xattr`` cache driver required the ``python-xattr>=0.6.0`` library
and requires that the filesystem containing ``image_cache_dir`` have
access times tracked for all files (in other words, the noatime option
CANNOT be set for that filesystem). In addition, ``user_xattr`` must be
set on the filesystem's description line in fstab. Because of these
requirements, the ``xattr`` cache driver is not available on Windows.
* ``image_cache_sqlite_db=DB_FILE``
Optional.
Default: ``cache.db``
When using the ``sqlite`` cache driver, you can set the name of the database
that will be used to store the cached images information. The database
is always contained in the ``image_cache_dir``.
Configuring the Glance Registry
-------------------------------

View File

@ -178,8 +178,8 @@ scrubber_datadir = /var/lib/glance/scrubber
# =============== Image Cache Options =============================
# Directory that the Image Cache writes data to
image_cache_datadir = /var/lib/glance/image-cache/
# Base directory that the Image Cache uses
image_cache_dir = /var/lib/glance/image-cache/
[pipeline:glance-api]
pipeline = versionnegotiation context apiv1app

View File

@ -11,11 +11,11 @@ log_file = /var/log/glance/image-cache.log
use_syslog = False
# Directory that the Image Cache writes data to
image_cache_datadir = /var/lib/glance/image-cache/
image_cache_dir = /var/lib/glance/image-cache/
# Number of seconds after which we should consider an incomplete image to be
# stalled and eligible for reaping
image_cache_stall_timeout = 86400
image_cache_stall_time = 86400
# image_cache_invalid_entry_grace_period - seconds
#
@ -27,18 +27,8 @@ image_cache_stall_timeout = 86400
# are elibible to be reaped.
image_cache_invalid_entry_grace_period = 3600
image_cache_max_size_bytes = 1073741824
# Percentage of the cache that should be freed (in addition to the overage)
# when the cache is pruned
#
# A percentage of 0% means we prune only as many files as needed to remain
# under the cache's max_size. This is space efficient but will lead to
# constant pruning as the size bounces just-above and just-below the max_size.
#
# To mitigate this 'thrashing', you can specify an additional amount of the
# cache that should be tossed out on each prune.
image_cache_percent_extra_to_free = 0.20
# Max cache size in bytes
image_cache_max_size = 1073741824
# Address to find the registry server
registry_host = 0.0.0.0
@ -52,5 +42,5 @@ paste.app_factory = glance.image_cache.pruner:app_factory
[app:glance-prefetcher]
paste.app_factory = glance.image_cache.prefetcher:app_factory
[app:glance-reaper]
paste.app_factory = glance.image_cache.reaper:app_factory
[app:glance-cleaner]
paste.app_factory = glance.image_cache.cleaner:app_factory

View File

@ -1,24 +0,0 @@
[DEFAULT]
# Show more verbose log output (sets INFO log level output)
verbose = True
# Show debugging output in logs (sets DEBUG log level output)
debug = False
log_file = /var/log/glance/prefetcher.log
# Send logs to syslog (/dev/log) instead of to file specified by `log_file`
use_syslog = False
# Directory that the Image Cache writes data to
# Make sure this is also set in glance-api.conf
image_cache_datadir = /var/lib/glance/image-cache/
# Address to find the registry server
registry_host = 0.0.0.0
# Port the registry server is listening on
registry_port = 9191
[app:glance-prefetcher]
paste.app_factory = glance.image_cache.prefetcher:app_factory

View File

@ -1,31 +0,0 @@
[DEFAULT]
# Show more verbose log output (sets INFO log level output)
verbose = True
# Show debugging output in logs (sets DEBUG log level output)
debug = False
log_file = /var/log/glance/pruner.log
# Send logs to syslog (/dev/log) instead of to file specified by `log_file`
use_syslog = False
image_cache_max_size_bytes = 1073741824
# Percentage of the cache that should be freed (in addition to the overage)
# when the cache is pruned
#
# A percentage of 0% means we prune only as many files as needed to remain
# under the cache's max_size. This is space efficient but will lead to
# constant pruning as the size bounces just-above and just-below the max_size.
#
# To mitigate this 'thrashing', you can specify an additional amount of the
# cache that should be tossed out on each prune.
image_cache_percent_extra_to_free = 0.20
# Directory that the Image Cache writes data to
# Make sure this is also set in glance-api.conf
image_cache_datadir = /var/lib/glance/image-cache/
[app:glance-pruner]
paste.app_factory = glance.image_cache.pruner:app_factory

View File

@ -1,32 +0,0 @@
[DEFAULT]
# Show more verbose log output (sets INFO log level output)
verbose = True
# Show debugging output in logs (sets DEBUG log level output)
debug = False
log_file = /var/log/glance/reaper.log
# Send logs to syslog (/dev/log) instead of to file specified by `log_file`
use_syslog = False
# Directory that the Image Cache writes data to
# Make sure this is also set in glance-api.conf
image_cache_datadir = /var/lib/glance/image-cache/
# image_cache_invalid_entry_grace_period - seconds
#
# If an exception is raised as we're writing to the cache, the cache-entry is
# deemed invalid and moved to <image_cache_datadir>/invalid so that it can be
# inspected for debugging purposes.
#
# This is number of seconds to leave these invalid images around before they
# are elibible to be reaped.
image_cache_invalid_entry_grace_period = 3600
# Number of seconds after which we should consider an incomplete image to be
# stalled and eligible for reaping
image_cache_stall_timeout = 86400
[app:glance-reaper]
paste.app_factory = glance.image_cache.reaper:app_factory

View File

@ -27,7 +27,8 @@ the local cached copy of the image file is returned.
import httplib
import logging
import re
import shutil
import webob
from glance import image_cache
from glance import registry
@ -36,8 +37,6 @@ from glance.common import exception
from glance.common import utils
from glance.common import wsgi
import webob
logger = logging.getLogger(__name__)
get_images_re = re.compile(r'^(/v\d+)*/images/(.+)$')
@ -48,8 +47,7 @@ class CacheFilter(wsgi.Middleware):
self.options = options
self.cache = image_cache.ImageCache(options)
self.serializer = images.ImageSerializer()
logger.info(_("Initialized image cache middleware using datadir: %s"),
options.get('image_cache_datadir'))
logger.info(_("Initialized image cache middleware"))
super(CacheFilter, self).__init__(app)
def process_request(self, request):
@ -67,7 +65,15 @@ class CacheFilter(wsgi.Middleware):
return None
image_id = match.group(2)
if self.cache.hit(image_id):
# /images/detail is unfortunately supported, so here we
# cut out those requests and anything with a query
# parameter...
# See LP Bug #879136
if '?' in image_id or image_id == 'detail':
return None
if self.cache.is_cached(image_id):
logger.debug(_("Cache hit for image '%s'"), image_id)
image_iterator = self.get_from_cache(image_id)
context = request.context
@ -83,24 +89,6 @@ class CacheFilter(wsgi.Middleware):
"however the registry did not contain metadata for "
"that image!" % image_id)
logger.error(msg)
return None
# Make sure we're not already prefetching or caching the image
# that just generated the miss
if self.cache.is_image_currently_prefetching(image_id):
logger.debug(_("Image '%s' is already being prefetched,"
" not tee'ing into the cache"), image_id)
return None
elif self.cache.is_image_currently_being_written(image_id):
logger.debug(_("Image '%s' is already being cached,"
" not tee'ing into the cache"), image_id)
return None
# NOTE(sirp): If we're about to download and cache an
# image which is currently in the prefetch queue, just
# delete the queue items since we're caching it anyway
if self.cache.is_image_queued_for_prefetch(image_id):
self.cache.delete_queued_prefetch_image(image_id)
return None
def process_response(self, resp):
@ -120,27 +108,13 @@ class CacheFilter(wsgi.Middleware):
return resp
image_id = match.group(2)
if not self.cache.hit(image_id):
# Make sure we're not already prefetching or caching the image
# that just generated the miss
if self.cache.is_image_currently_prefetching(image_id):
logger.debug(_("Image '%s' is already being prefetched,"
" not tee'ing into the cache"), image_id)
return resp
if self.cache.is_image_currently_being_written(image_id):
logger.debug(_("Image '%s' is already being cached,"
" not tee'ing into the cache"), image_id)
return resp
if '?' in image_id or image_id == 'detail':
return resp
logger.debug(_("Tee'ing image '%s' into cache"), image_id)
# TODO(jaypipes): This is so incredibly wasteful, but because
# the image cache needs the image's name, we have to do this.
# In the next iteration, remove the image cache's need for
# any attribute other than the id...
image_meta = registry.get_image_metadata(request.context,
image_id)
resp.app_iter = self.get_from_store_tee_into_cache(
image_meta, resp.app_iter)
if self.cache.is_cached(image_id):
return resp
resp.app_iter = self.cache.get_caching_iter(image_id, resp.app_iter)
return resp
def get_status_code(self, response):
@ -152,13 +126,6 @@ class CacheFilter(wsgi.Middleware):
return response.status_int
return response.status
def get_from_store_tee_into_cache(self, image_meta, image_iterator):
"""Called if cache miss"""
with self.cache.open(image_meta, "wb") as cache_file:
for chunk in image_iterator:
cache_file.write(chunk)
yield chunk
def get_from_cache(self, image_id):
"""Called if cache hit"""
with self.cache.open_for_read(image_id) as cache_file:

View File

@ -117,6 +117,11 @@ class BadStoreConfiguration(GlanceException):
"Reason: %(reason)s")
class BadDriverConfiguration(GlanceException):
message = _("Driver %(driver_name)s could not be configured correctly. "
"Reason: %(reason)s")
class StoreDeleteNotSupported(GlanceException):
message = _("Deleting images from this store is not supported.")

View File

@ -19,203 +19,85 @@
LRU Cache for Image Data
"""
from contextlib import contextmanager
import datetime
import itertools
import logging
import os
import sys
import time
from glance.common import config
from glance.common import exception
from glance.common import utils as cutils
from glance import utils
from glance.common import utils
logger = logging.getLogger(__name__)
DEFAULT_MAX_CACHE_SIZE = 10 * 1024 * 1024 * 1024 # 10 GB
class ImageCache(object):
"""
Provides an LRU cache for image data.
Assumptions
===========
"""Provides an LRU cache for image data."""
1. Cache data directory exists on a filesytem that updates atime on
reads ('noatime' should NOT be set)
2. Cache data directory exists on a filesystem that supports xattrs.
This is optional, but highly recommended since it allows us to
present ops with useful information pertaining to the cache, like
human readable filenames and statistics.
3. `glance-prune` is scheduled to run as a periodic job via cron. This
is needed to run the LRU prune strategy to keep the cache size
within the limits set by the config file.
Cache Directory Notes
=====================
The image cache data directory contains the main cache path, where the
active cache entries and subdirectories for handling partial downloads
and errored-out cache images.
The layout looks like:
image-cache/
entry1
entry2
...
incomplete/
invalid/
prefetch/
prefetching/
"""
def __init__(self, options):
self.options = options
self._make_cache_directory_if_needed()
self.init_driver()
def _make_cache_directory_if_needed(self):
"""Creates main cache directory along with incomplete subdirectory"""
# NOTE(sirp): making the incomplete_path will have the effect of
# creating the main cache path directory as well
paths = [self.incomplete_path, self.invalid_path, self.prefetch_path,
self.prefetching_path]
for path in paths:
cutils.safe_mkdirs(path)
@property
def path(self):
"""This is the base path for the image cache"""
datadir = self.options['image_cache_datadir']
return datadir
@property
def incomplete_path(self):
"""This provides a temporary place to write our cache entries so that
we we're not storing incomplete objects in the cache directly.
When the file is finished writing to, it is moved from the incomplete
path back out into the main cache directory.
The incomplete_path is a subdirectory of the main cache path to ensure
that they both reside on the same filesystem and thus making moves
cheap.
def init_driver(self):
"""
return os.path.join(self.path, 'incomplete')
@property
def invalid_path(self):
"""Place to move corrupted images
If an exception is raised while we're writing an image to the
incomplete_path, we move the incomplete image to here.
Create the driver for the cache
"""
return os.path.join(self.path, 'invalid')
@property
def prefetch_path(self):
"""This contains a list of image ids that should be pre-fetched into
the cache
"""
return os.path.join(self.path, 'prefetch')
@property
def prefetching_path(self):
"""This contains image ids that currently being prefetched"""
return os.path.join(self.path, 'prefetching')
def path_for_image(self, image_id):
"""This crafts an absolute path to a specific entry"""
return os.path.join(self.path, str(image_id))
def incomplete_path_for_image(self, image_id):
"""This crafts an absolute path to a specific entry in the incomplete
directory
"""
return os.path.join(self.incomplete_path, str(image_id))
def invalid_path_for_image(self, image_id):
"""This crafts an absolute path to a specific entry in the invalid
directory
"""
return os.path.join(self.invalid_path, str(image_id))
@contextmanager
def open(self, image_meta, mode="rb"):
"""Open a cache image for reading or writing.
We have two possible scenarios:
1. READ: we should attempt to read the file from the cache's
main directory
2. WRITE: we should write to a file under the cache's incomplete
directory, and when it's finished, move it out the main cache
directory.
"""
if mode == 'wb':
with self._open_write(image_meta, mode) as cache_file:
yield cache_file
elif mode == 'rb':
with self._open_read(image_meta, mode) as cache_file:
yield cache_file
else:
# NOTE(sirp): `rw` and `a' modes are not supported since image
# data is immutable, we `wb` it once, then `rb` multiple times.
raise Exception(_("mode '%s' not supported") % mode)
@contextmanager
def _open_write(self, image_meta, mode):
image_id = image_meta['id']
incomplete_path = self.incomplete_path_for_image(image_id)
def set_xattr(key, value):
utils.set_xattr(incomplete_path, key, value)
def commit():
set_xattr('image_name', image_meta['name'])
set_xattr('hits', 0)
final_path = self.path_for_image(image_id)
logger.debug(_("fetch finished, commiting by moving "
"'%(incomplete_path)s' to '%(final_path)s'"),
dict(incomplete_path=incomplete_path,
final_path=final_path))
os.rename(incomplete_path, final_path)
def rollback(e):
set_xattr('image_name', image_meta['name'])
set_xattr('error', "%s" % e)
invalid_path = self.invalid_path_for_image(image_id)
logger.debug(_("fetch errored, rolling back by moving "
"'%(incomplete_path)s' to '%(invalid_path)s'"),
dict(incomplete_path=incomplete_path,
invalid_path=invalid_path))
os.rename(incomplete_path, invalid_path)
driver_name = self.options.get('image_cache_driver', 'sqlite')
driver_module = (__name__ + '.drivers.' + driver_name + '.Driver')
try:
with open(incomplete_path, mode) as cache_file:
set_xattr('expected_size', image_meta['size'])
yield cache_file
except Exception as e:
rollback(e)
raise
else:
commit()
self.driver_class = utils.import_class(driver_module)
logger.info(_("Image cache loaded driver '%s'.") %
driver_name)
except exception.ImportFailure, import_err:
logger.warn(_("Image cache driver "
"'%(driver_name)s' failed to load. "
"Got error: '%(import_err)s.") % locals())
@contextmanager
def open_for_read(self, image_id):
path = self.path_for_image(image_id)
with open(path, 'rb') as cache_file:
yield cache_file
driver_module = __name__ + '.drivers.sqlite.Driver'
logger.info(_("Defaulting to SQLite driver."))
self.driver_class = utils.import_class(driver_module)
self.configure_driver()
utils.inc_xattr(path, 'hits') # bump the hit count
def configure_driver(self):
"""
Configure the driver for the cache and, if it fails to configure,
fall back to using the SQLite driver which has no odd dependencies
"""
try:
self.driver = self.driver_class(self.options)
self.driver.configure()
except exception.BadDriverConfiguration, config_err:
logger.warn(_("Image cache driver "
"'%(driver_module)s' failed to configure. "
"Got error: '%(config_err)s") % locals())
logger.info(_("Defaulting to SQLite driver."))
driver_module = __name__ + '.drivers.sqlite.Driver'
self.driver_class = utils.import_class(driver_module)
self.driver = self.driver_class(self.options)
self.driver.configure()
def is_cached(self, image_id):
"""
Returns True if the image with the supplied ID has its image
file cached.
:param image_id: Image ID
"""
return self.driver.is_cached(image_id)
def is_queued(self, image_id):
"""
Returns True if the image identifier is in our cache queue.
:param image_id: Image ID
"""
return self.driver.is_queued(image_id)
def get_cache_size(self):
"""
Returns the total size in bytes of the image cache.
"""
return self.driver.get_cache_size()
def get_hit_count(self, image_id):
"""
@ -223,234 +105,148 @@ class ImageCache(object):
:param image_id: Opaque image identifier
"""
path = self.path_for_image(image_id)
return int(utils.get_xattr(path, 'hits', default=0))
return self.driver.get_hit_count(image_id)
@contextmanager
def _open_read(self, image_meta, mode):
image_id = image_meta['id']
path = self.path_for_image(image_id)
with open(path, mode) as cache_file:
yield cache_file
utils.inc_xattr(path, 'hits') # bump the hit count
def hit(self, image_id):
return os.path.exists(self.path_for_image(image_id))
@staticmethod
def _delete_file(path):
if os.path.exists(path):
logger.debug(_("deleting image cache file '%s'"), path)
os.unlink(path)
else:
logger.warn(_("image cache file '%s' doesn't exist, unable to"
" delete"), path)
def purge(self, image_id):
path = self.path_for_image(image_id)
self._delete_file(path)
def clear(self):
purged = 0
for path in self.get_all_regular_files(self.path):
self._delete_file(path)
purged += 1
return purged
def is_image_currently_being_written(self, image_id):
"""Returns true if we're currently downloading an image"""
incomplete_path = self.incomplete_path_for_image(image_id)
return os.path.exists(incomplete_path)
def is_currently_prefetching_any_images(self):
"""True if we are currently prefetching an image.
We only allow one prefetch to occur at a time.
def delete_all(self):
"""
return len(os.listdir(self.prefetching_path)) > 0
def is_image_queued_for_prefetch(self, image_id):
prefetch_path = os.path.join(self.prefetch_path, str(image_id))
return os.path.exists(prefetch_path)
def is_image_currently_prefetching(self, image_id):
prefetching_path = os.path.join(self.prefetching_path, str(image_id))
return os.path.exists(prefetching_path)
def queue_prefetch(self, image_meta):
"""This adds a image to be prefetched to the queue directory.
If the image already exists in the queue directory or the
prefetching directory, we ignore it.
Removes all cached image files and any attributes about the images
and returns the number of cached image files that were deleted.
"""
image_id = image_meta['id']
return self.driver.delete_all()
if self.hit(image_id):
msg = _("Skipping prefetch, image '%s' already cached") % image_id
logger.warn(msg)
raise exception.Invalid(msg)
if self.is_image_currently_prefetching(image_id):
msg = _("Skipping prefetch, already prefetching "
"image '%s'") % image_id
logger.warn(msg)
raise exception.Invalid(msg)
if self.is_image_queued_for_prefetch(image_id):
msg = _("Skipping prefetch, image '%s' already queued for"
" prefetching") % image_id
logger.warn(msg)
raise exception.Invalid(msg)
prefetch_path = os.path.join(self.prefetch_path, str(image_id))
# Touch the file to add it to the queue
with open(prefetch_path, "w") as f:
pass
utils.set_xattr(prefetch_path, 'image_name', image_meta['name'])
def delete_queued_prefetch_image(self, image_id):
prefetch_path = os.path.join(self.prefetch_path, str(image_id))
self._delete_file(prefetch_path)
def delete_prefetching_image(self, image_id):
prefetching_path = os.path.join(self.prefetching_path, str(image_id))
self._delete_file(prefetching_path)
def pop_prefetch_item(self):
"""This returns the next prefetch job.
The prefetch directory is treated like a FIFO; so we sort by modified
time and pick the oldest.
def delete(self, image_id):
"""
items = []
for path in self.get_all_regular_files(self.prefetch_path):
mtime = os.path.getmtime(path)
items.append((mtime, path))
Removes a specific cached image file and any attributes about the image
if not items:
raise IndexError
# Sort oldest files to the end of the list
items.sort(reverse=True)
mtime, path = items.pop()
image_id = os.path.basename(path)
return image_id
def do_prefetch(self, image_id):
"""This moves the file from the prefetch queue path to the in-progress
prefetching path (so we don't try to prefetch something twice).
:param image_id: Image ID
"""
prefetch_path = os.path.join(self.prefetch_path, str(image_id))
prefetching_path = os.path.join(self.prefetching_path, str(image_id))
os.rename(prefetch_path, prefetching_path)
self.driver.delete(image_id)
@staticmethod
def get_all_regular_files(basepath):
for fname in os.listdir(basepath):
path = os.path.join(basepath, fname)
if os.path.isfile(path):
yield path
def _base_entries(self, basepath):
def iso8601_from_timestamp(timestamp):
return datetime.datetime.utcfromtimestamp(timestamp)\
.isoformat()
for path in self.get_all_regular_files(basepath):
filename = os.path.basename(path)
try:
image_id = int(filename)
except ValueError, TypeError:
continue
entry = {}
entry['id'] = image_id
entry['path'] = path
entry['name'] = utils.get_xattr(path, 'image_name',
default='UNKNOWN')
mtime = os.path.getmtime(path)
entry['last_modified'] = iso8601_from_timestamp(mtime)
atime = os.path.getatime(path)
entry['last_accessed'] = iso8601_from_timestamp(atime)
entry['size'] = os.path.getsize(path)
entry['expected_size'] = utils.get_xattr(
path, 'expected_size', default='UNKNOWN')
yield entry
def invalid_entries(self):
"""Cache info for invalid cached images"""
for entry in self._base_entries(self.invalid_path):
path = entry['path']
entry['error'] = utils.get_xattr(path, 'error', default='UNKNOWN')
yield entry
def incomplete_entries(self):
"""Cache info for incomplete cached images"""
for entry in self._base_entries(self.incomplete_path):
yield entry
def prefetch_entries(self):
"""Cache info for both queued and in-progress prefetch jobs"""
both_entries = itertools.chain(
self._base_entries(self.prefetch_path),
self._base_entries(self.prefetching_path))
for entry in both_entries:
path = entry['path']
entry['status'] = 'in-progress' if 'prefetching' in path\
else 'queued'
yield entry
def entries(self):
"""Cache info for currently cached images"""
for entry in self._base_entries(self.path):
path = entry['path']
entry['hits'] = utils.get_xattr(path, 'hits', default='UNKNOWN')
yield entry
def _reap_old_files(self, dirpath, entry_type, grace=None):
def prune(self):
"""
Removes all cached image files above the cache's maximum
size. Returns a tuple containing the total number of cached
files removed and the total size of all pruned image files.
"""
now = time.time()
reaped = 0
for path in self.get_all_regular_files(dirpath):
mtime = os.path.getmtime(path)
age = now - mtime
if not grace:
logger.debug(_("No grace period, reaping '%(path)s'"
" immediately"), locals())
self._delete_file(path)
reaped += 1
elif age > grace:
logger.debug(_("Cache entry '%(path)s' exceeds grace period, "
"(%(age)i s > %(grace)i s)"), locals())
self._delete_file(path)
reaped += 1
max_size = int(self.options.get('image_cache_max_size',
DEFAULT_MAX_CACHE_SIZE))
current_size = self.driver.get_cache_size()
if max_size > current_size:
logger.debug(_("Image cache has free space, skipping prune..."))
return (0, 0)
logger.info(_("Reaped %(reaped)s %(entry_type)s cache entries"),
locals())
return reaped
overage = current_size - max_size
logger.debug(_("Image cache currently %(overage)d bytes over max "
"size. Starting prune to max size of %(max_size)d ") %
locals())
def reap_invalid(self, grace=None):
"""Remove any invalid cache entries
total_bytes_pruned = 0
total_files_pruned = 0
entry = self.driver.get_least_recently_accessed()
while entry and current_size > max_size:
image_id, size = entry
logger.debug(_("Pruning '%(image_id)s' to free %(size)d bytes"),
{'image_id': image_id, 'size': size})
self.driver.delete(image_id)
total_bytes_pruned = total_bytes_pruned + size
total_files_pruned = total_files_pruned + 1
current_size = current_size - size
entry = self.driver.get_least_recently_accessed()
:param grace: Number of seconds to keep an invalid entry around for
debugging purposes. If None, then delete immediately.
logger.debug(_("Pruning finished pruning. "
"Pruned %(total_files_pruned)d and "
"%(total_bytes_pruned)d.") % locals())
return total_files_pruned, total_bytes_pruned
def clean(self):
"""
return self._reap_old_files(self.invalid_path, 'invalid', grace=grace)
Cleans up any invalid or incomplete cached images. The cache driver
decides what that means...
"""
self.driver.clean()
def reap_stalled(self):
"""Remove any stalled cache entries"""
stall_timeout = int(self.options.get('image_cache_stall_timeout',
86400))
return self._reap_old_files(self.incomplete_path, 'stalled',
grace=stall_timeout)
def queue_image(self, image_id):
"""
This adds a image to be cache to the queue.
If the image already exists in the queue or has already been
cached, we return False, True otherwise
:param image_id: Image ID
"""
return self.driver.queue_image(image_id)
def get_caching_iter(self, image_id, image_iter):
"""
Returns an iterator that caches the contents of an image
while the image contents are read through the supplied
iterator.
:param image_id: Image ID
:param image_iter: Iterator that will read image contents
"""
if not self.driver.is_cacheable(image_id):
return image_iter
logger.debug(_("Tee'ing image '%s' into cache"), image_id)
def tee_iter(image_id):
with self.driver.open_for_write(image_id) as cache_file:
for chunk in image_iter:
cache_file.write(chunk)
yield chunk
cache_file.flush()
return tee_iter(image_id)
def cache_image_iter(self, image_id, image_iter):
"""
Cache an image with supplied iterator.
:param image_id: Image ID
:param image_file: Iterator retrieving image chunks
:retval True if image file was cached, False otherwise
"""
if not self.driver.is_cacheable(image_id):
return False
with self.driver.open_for_write(image_id) as cache_file:
for chunk in image_iter:
cache_file.write(chunk)
cache_file.flush()
return True
def cache_image_file(self, image_id, image_file):
"""
Cache an image file.
:param image_id: Image ID
:param image_file: Image file to cache
:retval True if image file was cached, False otherwise
"""
CHUNKSIZE = 64 * 1024 * 1024
return self.cache_image_iter(image_id,
utils.chunkiter(image_file, CHUNKSIZE))
def open_for_read(self, image_id):
"""
Open and yield file for reading the image file for an image
with supplied identifier.
:note Upon successful reading of the image file, the image's
hit count will be incremented.
:param image_id: Image ID
"""
return self.driver.open_for_read(image_id)
def get_cache_queue(self):
"""
Returns a list of image IDs that are in the queue. The
list should be sorted by the time the image ID was inserted
into the queue.
"""
return self.driver.get_cache_queue()

View File

@ -16,30 +16,26 @@
# under the License.
"""
Reaps any invalid cache entries that exceed the grace period
Cleans up any invalid cache entries
"""
import logging
from glance.image_cache import ImageCache
logger = logging.getLogger('glance.image_cache.reaper')
logger = logging.getLogger(__name__)
class Reaper(object):
class Cleaner(object):
def __init__(self, options):
self.options = options
self.cache = ImageCache(options)
def run(self):
invalid_grace = int(self.options.get(
'image_cache_invalid_entry_grace_period',
3600))
self.cache.reap_invalid(grace=invalid_grace)
self.cache.reap_stalled()
self.cache.clean()
def app_factory(global_config, **local_conf):
conf = global_config.copy()
conf.update(local_conf)
return Reaper(conf)
return Cleaner(conf)

View File

@ -0,0 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,139 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Base attribute driver class
"""
from contextlib import contextmanager
class Driver(object):
def __init__(self, options):
"""
Initialize the attribute driver with a set of options.
:param options: Dictionary of configuration file options
:raises `exception.BadDriverConfiguration` if configuration of the
driver fails for any reason.
"""
self.options = options or {}
def configure(self):
"""
Configure the driver to use the stored configuration options
Any store that needs special configuration should implement
this method. If the store was not able to successfully configure
itself, it should raise `exception.BadDriverConfiguration`
"""
pass
def get_cache_size(self):
"""
Returns the total size in bytes of the image cache.
"""
raise NotImplementedError
def is_cached(self, image_id):
"""
Returns True if the image with the supplied ID has its image
file cached.
:param image_id: Image ID
"""
raise NotImplementedError
def is_cacheable(self, image_id):
"""
Returns True if the image with the supplied ID can have its
image file cached, False otherwise.
:param image_id: Image ID
"""
raise NotImplementedError
def is_queued(self, image_id):
"""
Returns True if the image identifier is in our cache queue.
:param image_id: Image ID
"""
raise NotImplementedError
def delete_all(self):
"""
Removes all cached image files and any attributes about the images
and returns the number of cached image files that were deleted.
"""
raise NotImplementedError
def delete(self, image_id):
"""
Removes a specific cached image file and any attributes about the image
:param image_id: Image ID
"""
raise NotImplementedError
def queue_image(self, image_id):
"""
Puts an image identifier in a queue for caching. Return True
on successful add to the queue, False otherwise...
:param image_id: Image ID
"""
def clean(self):
"""
Dependent on the driver, clean up and destroy any invalid or incomplete
cached images
"""
raise NotImplementedError
def get_least_recently_accessed(self):
"""
Return a tuple containing the image_id and size of the least recently
accessed cached file, or None if no cached files.
"""
raise NotImplementedError
def open_for_write(self, image_id):
"""
Open a file for writing the image file for an image
with supplied identifier.
:param image_id: Image ID
"""
raise NotImplementedError
def open_for_read(self, image_id):
"""
Open and yield file for reading the image file for an image
with supplied identifier.
:param image_id: Image ID
"""
raise NotImplementedError
def get_cache_queue(self):
"""
Returns a list of image IDs that are in the queue. The
list should be sorted by the time the image ID was inserted
into the queue.
"""
raise NotImplementedError

View File

@ -0,0 +1,507 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Cache driver that uses SQLite to store information about cached images
"""
from __future__ import absolute_import
from contextlib import contextmanager
import datetime
import itertools
import logging
import os
import stat
import time
from eventlet import sleep, timeout
import sqlite3
from glance.common import exception
from glance.common import utils
from glance.image_cache.drivers import base
logger = logging.getLogger(__name__)
DEFAULT_SQL_CALL_TIMEOUT = 2
DEFAULT_STALL_TIME = 86400 # 24 hours
DEFAULT_SQLITE_DB = 'cache.db'
class SqliteConnection(sqlite3.Connection):
"""
SQLite DB Connection handler that plays well with eventlet,
slightly modified from Swift's similar code.
"""
def __init__(self, *args, **kwargs):
self.timeout_seconds = kwargs.get('timeout', DEFAULT_SQL_CALL_TIMEOUT)
kwargs['timeout'] = 0
sqlite3.Connection.__init__(self, *args, **kwargs)
def _timeout(self, call):
with timeout.Timeout(self.timeout_seconds):
while True:
try:
return call()
except sqlite3.OperationalError, e:
if 'locked' not in str(e):
raise
sleep(0.05)
def execute(self, *args, **kwargs):
return self._timeout(lambda: sqlite3.Connection.execute(
self, *args, **kwargs))
def commit(self):
return self._timeout(lambda: sqlite3.Connection.commit(self))
class Driver(base.Driver):
"""
Cache driver that uses xattr file tags and requires a filesystem
that has atimes set.
"""
def configure(self):
"""
Configure the driver to use the stored configuration options
Any store that needs special configuration should implement
this method. If the store was not able to successfully configure
itself, it should raise `exception.BadDriverConfiguration`
"""
# Here we set up the various file-based image cache paths
# that we need in order to find the files in different states
# of cache management. Once we establish these paths, we create
# the SQLite database that will hold our cache attributes
self.set_paths()
self.initialize_db()
def set_paths(self):
"""
Creates all necessary directories under the base cache directory
"""
self.base_dir = self.options.get('image_cache_dir')
self.incomplete_dir = os.path.join(self.base_dir, 'incomplete')
self.invalid_dir = os.path.join(self.base_dir, 'invalid')
self.queue_dir = os.path.join(self.base_dir, 'queue')
dirs = [self.incomplete_dir, self.invalid_dir, self.queue_dir]
for path in dirs:
utils.safe_mkdirs(path)
def initialize_db(self):
db = self.options.get('image_cache_sqlite_db', DEFAULT_SQLITE_DB)
self.db_path = os.path.join(self.base_dir, db)
try:
conn = sqlite3.connect(self.db_path, check_same_thread=False,
factory=SqliteConnection)
conn.executescript("""
CREATE TABLE IF NOT EXISTS cached_images (
image_id TEXT PRIMARY KEY,
last_access REAL DEFAULT 0.0,
size INTEGER DEFAULT 0,
hits INTEGER DEFAULT 0,
checksum TEXT
);
""")
conn.close()
except sqlite3.DatabaseError, e:
msg = _("Failed to initialize the image cache database. "
"Got error: %s") % e
logger.error(msg)
raise BadDriverConfiguration(driver_name='sqlite', reason=msg)
def get_cache_size(self):
"""
Returns the total size in bytes of the image cache.
"""
sizes = []
for path in self.get_cache_files(self.base_dir):
if path == self.db_path:
continue
file_info = os.stat(path)
sizes.append(file_info[stat.ST_SIZE])
return sum(sizes)
def is_cached(self, image_id):
"""
Returns True if the image with the supplied ID has its image
file cached.
:param image_id: Image ID
"""
return os.path.exists(self.get_image_filepath(image_id))
def is_cacheable(self, image_id):
"""
Returns True if the image with the supplied ID can have its
image file cached, False otherwise.
:param image_id: Image ID
"""
# Make sure we're not already cached or caching the image
return not (self.is_cached(image_id) or
self.is_being_cached(image_id))
def is_being_cached(self, image_id):
"""
Returns True if the image with supplied id is currently
in the process of having its image file cached.
:param image_id: Image ID
"""
path = self.get_image_filepath(image_id, 'incomplete')
return os.path.exists(path)
def is_queued(self, image_id):
"""
Returns True if the image identifier is in our cache queue.
:param image_id: Image ID
"""
path = self.get_image_filepath(image_id, 'queue')
return os.path.exists(path)
def get_hit_count(self, image_id):
"""
Return the number of hits that an image has.
:param image_id: Opaque image identifier
"""
if not self.is_cached(image_id):
return 0
hits = 0
with self.get_db() as db:
cur = db.execute("""SELECT hits FROM cached_images
WHERE image_id = ?""",
(image_id,))
hits = cur.fetchone()[0]
return hits
def delete_all(self):
"""
Removes all cached image files and any attributes about the images
"""
deleted = 0
with self.get_db() as db:
for path in self.get_cache_files(self.base_dir):
delete_cached_file(path)
deleted += 1
db.execute("""DELETE FROM cached_images""")
db.commit()
return deleted
def delete(self, image_id):
"""
Removes a specific cached image file and any attributes about the image
:param image_id: Image ID
"""
path = self.get_image_filepath(image_id)
with self.get_db() as db:
delete_cached_file(path)
db.execute("""DELETE FROM cached_images WHERE image_id = ?""",
(image_id, ))
db.commit()
def clean(self):
"""
Delete any image files in the invalid directory and any
files in the incomplete directory that are older than a
configurable amount of time.
"""
self.delete_invalid_files()
incomplete_stall_time = int(self.options.get('image_cache_stall_time',
DEFAULT_STALL_TIME))
now = time.time()
older_than = now - incomplete_stall_time
self.delete_incomplete_files(older_than)
def get_least_recently_accessed(self):
"""
Return a tuple containing the image_id and size of the least recently
accessed cached file, or None if no cached files.
"""
with self.get_db() as db:
cur = db.execute("""SELECT image_id FROM cached_images
ORDER BY last_access LIMIT 1""")
image_id = cur.fetchone()[0]
path = self.get_image_filepath(image_id)
file_info = os.stat(path)
return image_id, file_info[stat.ST_SIZE]
@contextmanager
def open_for_write(self, image_id):
"""
Open a file for writing the image file for an image
with supplied identifier.
:param image_id: Image ID
"""
incomplete_path = self.get_image_filepath(image_id, 'incomplete')
def commit():
with self.get_db() as db:
final_path = self.get_image_filepath(image_id)
logger.debug(_("Fetch finished, moving "
"'%(incomplete_path)s' to '%(final_path)s'"),
dict(incomplete_path=incomplete_path,
final_path=final_path))
os.rename(incomplete_path, final_path)
# Make sure that we "pop" the image from the queue...
if self.is_queued(image_id):
os.unlink(self.get_image_filepath(image_id, 'queue'))
db.execute("""INSERT INTO cached_images
(image_id, last_access, hits)
VALUES (?, 0, 0)""", (image_id, ))
db.commit()
def rollback(e):
with self.get_db() as db:
invalid_path = self.get_image_filepath(image_id, 'invalid')
logger.debug(_("Fetch of cache file failed, rolling back by "
"moving '%(incomplete_path)s' to "
"'%(invalid_path)s'") % locals())
os.rename(incomplete_path, invalid_path)
db.execute("""DELETE FROM cached_images
WHERE image_id = ?""", (image_id, ))
db.commit()
try:
with open(incomplete_path, 'wb') as cache_file:
yield cache_file
except Exception as e:
rollback(e)
raise
else:
commit()
@contextmanager
def open_for_read(self, image_id):
"""
Open and yield file for reading the image file for an image
with supplied identifier.
:param image_id: Image ID
"""
path = self.get_image_filepath(image_id)
with open(path, 'rb') as cache_file:
yield cache_file
now = time.time()
with self.get_db() as db:
db.execute("""UPDATE cached_images
SET hits = hits + 1, last_access = ?
WHERE image_id = ?""",
(now, image_id))
db.commit()
@contextmanager
def get_db(self):
"""
Returns a context manager that produces a database connection that
self-closes and calls rollback if an error occurs while using the
database connection
"""
conn = sqlite3.connect(self.db_path, check_same_thread=False,
factory=SqliteConnection)
conn.row_factory = sqlite3.Row
conn.text_factory = str
conn.execute('PRAGMA synchronous = NORMAL')
conn.execute('PRAGMA count_changes = OFF')
conn.execute('PRAGMA temp_store = MEMORY')
try:
yield conn
except sqlite3.DatabaseError, e:
msg = _("Error executing SQLite call. Got error: %s") % e
logger.error(msg)
conn.rollback()
finally:
conn.close()
def get_image_filepath(self, image_id, cache_status='active'):
"""
This crafts an absolute path to a specific entry
:param image_id: Image ID
:param cache_status: Status of the image in the cache
"""
if cache_status == 'active':
return os.path.join(self.base_dir, str(image_id))
return os.path.join(self.base_dir, cache_status, str(image_id))
def queue_image(self, image_id):
"""
This adds a image to be cache to the queue.
If the image already exists in the queue or has already been
cached, we return False, True otherwise
:param image_id: Image ID
"""
if self.is_cached(image_id):
msg = _("Not queueing image '%s'. Already cached.") % image_id
logger.warn(msg)
return False
if self.is_being_cached(image_id):
msg = _("Not queueing image '%s'. Already being "
"written to cache") % image_id
logger.warn(msg)
return False
if self.is_queued(image_id):
msg = _("Not queueing image '%s'. Already queued.") % image_id
logger.warn(msg)
return False
path = self.get_image_filepath(image_id, 'queue')
# Touch the file to add it to the queue
with open(path, "w") as f:
pass
return True
def _base_entries(self, basepath):
def iso8601_from_timestamp(timestamp):
return datetime.datetime.utcfromtimestamp(timestamp)\
.isoformat()
for path in self.self.get_cache_files(basepath):
filename = os.path.basename(path)
try:
image_id = int(filename)
except ValueError, TypeError:
continue
entry = {}
entry['id'] = image_id
entry['path'] = path
entry['name'] = self.driver.get_attr(image_id, 'active',
'image_name',
default='UNKNOWN')
mtime = os.path.getmtime(path)
entry['last_modified'] = iso8601_from_timestamp(mtime)
atime = os.path.getatime(path)
entry['last_accessed'] = iso8601_from_timestamp(atime)
entry['size'] = os.path.getsize(path)
entry['expected_size'] = self.driver.get_attr(image_id,
'active', 'expected_size', default='UNKNOWN')
yield entry
def invalid_entries(self):
"""Cache info for invalid cached images"""
for entry in self._base_entries(self.invalid_path):
path = entry['path']
entry['error'] = self.driver.get_attr(image_id, 'invalid',
'error',
default='UNKNOWN')
yield entry
def incomplete_entries(self):
"""Cache info for incomplete cached images"""
for entry in self._base_entries(self.incomplete_path):
yield entry
def prefetch_entries(self):
"""Cache info for both queued and in-progress prefetch jobs"""
both_entries = itertools.chain(
self._base_entries(self.prefetch_path),
self._base_entries(self.prefetching_path))
for entry in both_entries:
path = entry['path']
entry['status'] = 'in-progress' if 'prefetching' in path\
else 'queued'
yield entry
def entries(self):
"""Cache info for currently cached images"""
for entry in self._base_entries(self.path):
path = entry['path']
entry['hits'] = self.driver.get_attr(image_id, 'active',
'hits',
default='UNKNOWN')
yield entry
def delete_invalid_files(self):
"""
Removes any invalid cache entries
"""
for path in self.get_cache_files(self.invalid_dir):
os.unlink(path)
logger.info("Removed invalid cache file %s", path)
def delete_stalled_files(self, older_than):
"""
Removes any incomplete cache entries older than a
supplied modified time.
:param older_than: Files written to on or before this timestemp
will be deleted.
"""
for path in self.get_cache_files(self.incomplete_dir):
os.unlink(path)
logger.info("Removed stalled cache file %s", path)
def get_cache_queue(self):
"""
Returns a list of image IDs that are in the queue. The
list should be sorted by the time the image ID was inserted
into the queue.
"""
files = [f for f in self.get_cache_files(self.queue_dir)]
items = []
for path in files:
mtime = os.path.getmtime(path)
items.append((mtime, os.path.basename(path)))
items.sort()
return [image_id for (mtime, image_id) in items]
def get_cache_files(self, basepath):
"""
Returns cache files in the supplied directory
:param basepath: Directory to look in for cache files
"""
for fname in os.listdir(basepath):
path = os.path.join(basepath, fname)
if path != self.db_path and os.path.isfile(path):
yield path
def delete_cached_file(path):
if os.path.exists(path):
logger.debug(_("Deleting image cache file '%s'"), path)
os.unlink(path)
else:
logger.warn(_("Cached image file '%s' doesn't exist, unable to"
" delete"), path)

View File

@ -0,0 +1,527 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Cache driver that uses xattr file tags and requires a filesystem
that has atimes set.
Assumptions
===========
1. Cache data directory exists on a filesytem that updates atime on
reads ('noatime' should NOT be set)
2. Cache data directory exists on a filesystem that supports xattrs.
This is optional, but highly recommended since it allows us to
present ops with useful information pertaining to the cache, like
human readable filenames and statistics.
3. `glance-prune` is scheduled to run as a periodic job via cron. This
is needed to run the LRU prune strategy to keep the cache size
within the limits set by the config file.
Cache Directory Notes
=====================
The image cache data directory contains the main cache path, where the
active cache entries and subdirectories for handling partial downloads
and errored-out cache images.
The layout looks like:
$image_cache_dir/
entry1
entry2
...
incomplete/
invalid/
queue/
"""
from __future__ import absolute_import
from contextlib import contextmanager
import datetime
import errno
import itertools
import logging
import os
import stat
import time
import xattr
from glance.common import exception
from glance.common import utils
from glance.image_cache.drivers import base
logger = logging.getLogger(__name__)
class Driver(base.Driver):
"""
Cache driver that uses xattr file tags and requires a filesystem
that has atimes set.
"""
def configure(self):
"""
Configure the driver to use the stored configuration options
Any store that needs special configuration should implement
this method. If the store was not able to successfully configure
itself, it should raise `exception.BadDriverConfiguration`
"""
# Here we set up the various file-based image cache paths
# that we need in order to find the files in different states
# of cache management. Once we establish these paths, we do
# a quick attempt to write a user xattr to a temporary file
# to check that the filesystem is even enabled to support xattrs
self.set_paths()
def set_paths(self):
"""
Creates all necessary directories under the base cache directory
"""
self.base_dir = self.options.get('image_cache_dir')
self.incomplete_dir = os.path.join(self.base_dir, 'incomplete')
self.invalid_dir = os.path.join(self.base_dir, 'invalid')
self.queue_dir = os.path.join(self.base_dir, 'queue')
dirs = [self.incomplete_dir, self.invalid_dir, self.queue_dir]
for path in dirs:
utils.safe_mkdirs(path)
def get_cache_size(self):
"""
Returns the total size in bytes of the image cache.
"""
sizes = []
for path in get_all_regular_files(self.base_dir):
file_info = os.stat(path)
sizes.append(file_info[stat.ST_SIZE])
return sum(sizes)
def is_cached(self, image_id):
"""
Returns True if the image with the supplied ID has its image
file cached.
:param image_id: Image ID
"""
return os.path.exists(self.get_image_filepath(image_id))
def is_cacheable(self, image_id):
"""
Returns True if the image with the supplied ID can have its
image file cached, False otherwise.
:param image_id: Image ID
"""
# Make sure we're not already cached or caching the image
return not (self.is_cached(image_id) or
self.is_being_cached(image_id))
def is_being_cached(self, image_id):
"""
Returns True if the image with supplied id is currently
in the process of having its image file cached.
:param image_id: Image ID
"""
path = self.get_image_filepath(image_id, 'incomplete')
return os.path.exists(path)
def is_queued(self, image_id):
"""
Returns True if the image identifier is in our cache queue.
"""
path = self.get_image_filepath(image_id, 'queue')
return os.path.exists(path)
def get_hit_count(self, image_id):
"""
Return the number of hits that an image has.
:param image_id: Opaque image identifier
"""
if not self.is_cached(image_id):
return 0
path = self.get_image_filepath(image_id)
return int(get_xattr(path, 'hits', default=0))
def delete_all(self):
"""
Removes all cached image files and any attributes about the images
"""
deleted = 0
for path in get_all_regular_files(self.base_dir):
delete_cached_file(path)
deleted += 1
return deleted
def delete(self, image_id):
"""
Removes a specific cached image file and any attributes about the image
:param image_id: Image ID
"""
path = self.get_image_filepath(image_id)
delete_cached_file(path)
def get_least_recently_accessed(self):
"""
Return a tuple containing the image_id and size of the least recently
accessed cached file, or None if no cached files.
"""
stats = []
for path in get_all_regular_files(self.base_dir):
file_info = os.stat(path)
stats.append((file_info[stat.ST_ATIME], # access time
file_info[stat.ST_SIZE], # size in bytes
path)) # absolute path
if not stats:
return None
stats.sort()
return os.path.basename(stats[0][2]), stats[0][1]
@contextmanager
def open_for_write(self, image_id):
"""
Open a file for writing the image file for an image
with supplied identifier.
:param image_id: Image ID
"""
incomplete_path = self.get_image_filepath(image_id, 'incomplete')
def set_attr(key, value):
set_xattr(incomplete_path, key, value)
def commit():
set_attr('hits', 0)
final_path = self.get_image_filepath(image_id)
logger.debug(_("Fetch finished, moving "
"'%(incomplete_path)s' to '%(final_path)s'"),
dict(incomplete_path=incomplete_path,
final_path=final_path))
os.rename(incomplete_path, final_path)
# Make sure that we "pop" the image from the queue...
if self.is_queued(image_id):
logger.debug(_("Removing image '%s' from queue after "
"caching it."), image_id)
os.unlink(self.get_image_filepath(image_id, 'queue'))
def rollback(e):
set_attr('error', "%s" % e)
invalid_path = self.get_image_filepath(image_id, 'invalid')
logger.debug(_("Fetch of cache file failed, rolling back by "
"moving '%(incomplete_path)s' to "
"'%(invalid_path)s'") % locals())
os.rename(incomplete_path, invalid_path)
try:
with open(incomplete_path, 'wb') as cache_file:
yield cache_file
except Exception as e:
rollback(e)
raise
else:
commit()
@contextmanager
def open_for_read(self, image_id):
"""
Open and yield file for reading the image file for an image
with supplied identifier.
:param image_id: Image ID
"""
path = self.get_image_filepath(image_id)
with open(path, 'rb') as cache_file:
yield cache_file
path = self.get_image_filepath(image_id)
inc_xattr(path, 'hits', 1)
def get_image_filepath(self, image_id, cache_status='active'):
"""
This crafts an absolute path to a specific entry
:param image_id: Image ID
:param cache_status: Status of the image in the cache
"""
if cache_status == 'active':
return os.path.join(self.base_dir, str(image_id))
return os.path.join(self.base_dir, cache_status, str(image_id))
def queue_image(self, image_id):
"""
This adds a image to be cache to the queue.
If the image already exists in the queue or has already been
cached, we return False, True otherwise
:param image_id: Image ID
"""
if self.is_cached(image_id):
msg = _("Not queueing image '%s'. Already cached.") % image_id
logger.warn(msg)
return False
if self.is_being_cached(image_id):
msg = _("Not queueing image '%s'. Already being "
"written to cache") % image_id
logger.warn(msg)
return False
if self.is_queued(image_id):
msg = _("Not queueing image '%s'. Already queued.") % image_id
logger.warn(msg)
return False
path = self.get_image_filepath(image_id, 'queue')
logger.debug(_("Queueing image '%s'."), image_id)
# Touch the file to add it to the queue
with open(path, "w") as f:
pass
return True
def get_cache_queue(self):
"""
Returns a list of image IDs that are in the queue. The
list should be sorted by the time the image ID was inserted
into the queue.
"""
files = [f for f in get_all_regular_files(self.queue_dir)]
items = []
for path in files:
mtime = os.path.getmtime(path)
items.append((mtime, os.path.basename(path)))
items.sort()
return [image_id for (mtime, image_id) in items]
def _base_entries(self, basepath):
def iso8601_from_timestamp(timestamp):
return datetime.datetime.utcfromtimestamp(timestamp)\
.isoformat()
for path in self.get_all_regular_files(basepath):
filename = os.path.basename(path)
try:
image_id = int(filename)
except ValueError, TypeError:
continue
entry = {}
entry['id'] = image_id
entry['path'] = path
entry['name'] = self.driver.get_attr(image_id, 'active',
'image_name',
default='UNKNOWN')
mtime = os.path.getmtime(path)
entry['last_modified'] = iso8601_from_timestamp(mtime)
atime = os.path.getatime(path)
entry['last_accessed'] = iso8601_from_timestamp(atime)
entry['size'] = os.path.getsize(path)
entry['expected_size'] = self.driver.get_attr(image_id,
'active', 'expected_size', default='UNKNOWN')
yield entry
def invalid_entries(self):
"""Cache info for invalid cached images"""
for entry in self._base_entries(self.invalid_path):
path = entry['path']
entry['error'] = self.driver.get_attr(image_id, 'invalid',
'error',
default='UNKNOWN')
yield entry
def incomplete_entries(self):
"""Cache info for incomplete cached images"""
for entry in self._base_entries(self.incomplete_path):
yield entry
def prefetch_entries(self):
"""Cache info for both queued and in-progress prefetch jobs"""
both_entries = itertools.chain(
self._base_entries(self.prefetch_path),
self._base_entries(self.prefetching_path))
for entry in both_entries:
path = entry['path']
entry['status'] = 'in-progress' if 'prefetching' in path\
else 'queued'
yield entry
def entries(self):
"""Cache info for currently cached images"""
for entry in self._base_entries(self.path):
path = entry['path']
entry['hits'] = self.driver.get_attr(image_id, 'active',
'hits',
default='UNKNOWN')
yield entry
def _reap_old_files(self, dirpath, entry_type, grace=None):
"""
"""
now = time.time()
reaped = 0
for path in self.get_all_regular_files(dirpath):
mtime = os.path.getmtime(path)
age = now - mtime
if not grace:
logger.debug(_("No grace period, reaping '%(path)s'"
" immediately"), locals())
self._delete_file(path)
reaped += 1
elif age > grace:
logger.debug(_("Cache entry '%(path)s' exceeds grace period, "
"(%(age)i s > %(grace)i s)"), locals())
self._delete_file(path)
reaped += 1
logger.info(_("Reaped %(reaped)s %(entry_type)s cache entries"),
locals())
return reaped
def reap_invalid(self, grace=None):
"""Remove any invalid cache entries
:param grace: Number of seconds to keep an invalid entry around for
debugging purposes. If None, then delete immediately.
"""
return self._reap_old_files(self.invalid_path, 'invalid', grace=grace)
def reap_stalled(self):
"""Remove any stalled cache entries"""
stall_timeout = int(self.options.get('image_cache_stall_timeout',
86400))
return self._reap_old_files(self.incomplete_path, 'stalled',
grace=stall_timeout)
def get_all_regular_files(basepath):
for fname in os.listdir(basepath):
path = os.path.join(basepath, fname)
if os.path.isfile(path):
yield path
def delete_cached_file(path):
if os.path.exists(path):
logger.debug(_("Deleting image cache file '%s'"), path)
os.unlink(path)
else:
logger.warn(_("Cached image file '%s' doesn't exist, unable to"
" delete"), path)
def _make_namespaced_xattr_key(key, namespace='user'):
"""
Create a fully-qualified xattr-key by including the intended namespace.
Namespacing differs among OSes[1]:
FreeBSD: user, system
Linux: user, system, trusted, security
MacOS X: not needed
Mac OS X won't break if we include a namespace qualifier, so, for
simplicity, we always include it.
--
[1] http://en.wikipedia.org/wiki/Extended_file_attributes
"""
namespaced_key = ".".join([namespace, key])
return namespaced_key
def get_xattr(path, key, **kwargs):
"""Return the value for a particular xattr
If the key doesn't not exist, or xattrs aren't supported by the file
system then a KeyError will be raised, that is, unless you specify a
default using kwargs.
"""
namespaced_key = _make_namespaced_xattr_key(key)
entry_xattr = xattr.xattr(path)
try:
return entry_xattr[namespaced_key]
except KeyError:
if 'default' in kwargs:
return kwargs['default']
else:
raise
def set_xattr(path, key, value):
"""Set the value of a specified xattr.
If xattrs aren't supported by the file-system, we skip setting the value.
"""
namespaced_key = _make_namespaced_xattr_key(key)
entry_xattr = xattr.xattr(path)
try:
entry_xattr.set(namespaced_key, str(value))
except IOError as e:
if e.errno == errno.EOPNOTSUPP:
logger.warn(_("xattrs not supported, skipping..."))
else:
raise
def inc_xattr(path, key, n=1):
"""
Increment the value of an xattr (assuming it is an integer).
BEWARE, this code *does* have a RACE CONDITION, since the
read/update/write sequence is not atomic.
Since the use-case for this function is collecting stats--not critical--
the benefits of simple, lock-free code out-weighs the possibility of an
occasional hit not being counted.
"""
try:
count = int(get_xattr(path, key))
except KeyError:
# NOTE(sirp): a KeyError is generated in two cases:
# 1) xattrs is not supported by the filesystem
# 2) the key is not present on the file
#
# In either case, just ignore it...
pass
else:
# NOTE(sirp): only try to bump the count if xattrs is supported
# and the key is present
count += n
set_xattr(path, key, str(count))

View File

@ -18,70 +18,68 @@
"""
Prefetches images into the Image Cache
"""
import logging
import os
import stat
import time
import eventlet
from glance.common import config
from glance.common import context
from glance.common import exception
from glance.image_cache import ImageCache
from glance import registry
from glance.store import get_from_backend
logger = logging.getLogger('glance.image_cache.prefetcher')
logger = logging.getLogger(__name__)
class Prefetcher(object):
def __init__(self, options):
self.options = options
self.cache = ImageCache(options)
registry.configure_registry_client(options)
def fetch_image_into_cache(self, image_id):
ctx = context.RequestContext(is_admin=True, show_deleted=True)
image_meta = registry.get_image_metadata(
self.options, ctx, image_id)
with self.cache.open(image_meta, "wb") as cache_file:
chunks = get_from_backend(image_meta['location'],
expected_size=image_meta['size'],
options=self.options)
for chunk in chunks:
cache_file.write(chunk)
try:
image_meta = registry.get_image_metadata(ctx, image_id)
if image_meta['status'] != 'active':
logger.warn(_("Image '%s' is not active. Not caching."),
image_id)
return False
except exception.NotFound:
logger.warn(_("No metadata found for image '%s'"), image_id)
return False
chunks = get_from_backend(image_meta['location'],
options=self.options)
logger.debug(_("Caching image '%s'"), image_id)
self.cache.cache_image_iter(image_id, chunks)
return True
def run(self):
if self.cache.is_currently_prefetching_any_images():
logger.debug(_("Currently prefetching, going back to sleep..."))
return
try:
image_id = self.cache.pop_prefetch_item()
except IndexError:
logger.debug(_("Nothing to prefetch, going back to sleep..."))
return
images = self.cache.get_cache_queue()
if not images:
logger.debug(_("Nothing to prefetch."))
return True
if self.cache.hit(image_id):
logger.warn(_("Image %s is already in the cache, deleting "
"prefetch job and going back to sleep..."), image_id)
self.cache.delete_queued_prefetch_image(image_id)
return
num_images = len(images)
logger.debug(_("Found %d images to prefetch"), num_images)
# NOTE(sirp): if someone is already downloading an image that is in
# the prefetch queue, then go ahead and delete that item and try to
# prefetch another
if self.cache.is_image_currently_being_written(image_id):
logger.warn(_("Image %s is already being cached, deleting "
"prefetch job and going back to sleep..."), image_id)
self.cache.delete_queued_prefetch_image(image_id)
return
pool = eventlet.GreenPool(num_images)
results = pool.imap(self.fetch_image_into_cache, images)
successes = sum([1 for r in results if r is True])
if successes != num_images:
logger.error(_("Failed to successfully cache all "
"images in queue."))
return False
logger.debug(_("Prefetching '%s'"), image_id)
self.cache.do_prefetch(image_id)
try:
self.fetch_image_into_cache(image_id)
finally:
self.cache.delete_prefetching_image(image_id)
logger.info(_("Successfully cache all %d images"), num_images)
return True
def app_factory(global_config, **local_conf):

View File

@ -18,15 +18,12 @@
"""
Prunes the Image Cache
"""
import logging
import os
import stat
import time
from glance.common import config
import logging
from glance.image_cache import ImageCache
logger = logging.getLogger('glance.image_cache.pruner')
logger = logging.getLogger(__name__)
class Pruner(object):
@ -34,79 +31,8 @@ class Pruner(object):
self.options = options
self.cache = ImageCache(options)
@property
def max_size(self):
default = 1 * 1024 * 1024 * 1024 # 1 GB
return config.get_option(
self.options, 'image_cache_max_size_bytes',
type='int', default=default)
@property
def percent_extra_to_free(self):
return config.get_option(
self.options, 'image_cache_percent_extra_to_free',
type='float', default=0.05)
def run(self):
self.prune_cache()
def prune_cache(self):
"""Prune the cache using an LRU strategy"""
# NOTE(sirp): 'Recency' is determined via the filesystem, first using
# atime (access time) and falling back to mtime (modified time).
#
# It has become more common to disable access-time updates by setting
# the `noatime` option for the filesystem. `noatime` is NOT compatible
# with this method.
#
# If `noatime` needs to be supported, we will need to persist access
# times elsewhere (either as a separate file, in the DB, or as
# an xattr).
def get_stats():
stats = []
for path in self.cache.get_all_regular_files(self.cache.path):
file_info = os.stat(path)
stats.append((file_info[stat.ST_ATIME], # access time
file_info[stat.ST_MTIME], # modification time
file_info[stat.ST_SIZE], # size in bytes
path)) # absolute path
return stats
def prune_lru(stats, to_free):
# Sort older access and modified times to the back
stats.sort(reverse=True)
freed = 0
while to_free > 0:
atime, mtime, size, path = stats.pop()
logger.debug(_("deleting '%(path)s' to free %(size)d B"),
locals())
os.unlink(path)
to_free -= size
freed += size
return freed
stats = get_stats()
# Check for overage
cur_size = sum(s[2] for s in stats)
max_size = self.max_size
logger.debug(_("cur_size=%(cur_size)d B max_size=%(max_size)d B"),
locals())
if cur_size <= max_size:
logger.debug(_("cache has free space, skipping prune..."))
return
overage = cur_size - max_size
extra = max_size * self.percent_extra_to_free
to_free = overage + extra
logger.debug(_("overage=%(overage)d B extra=%(extra)d B"
" total=%(to_free)d B"), locals())
freed = prune_lru(stats, to_free)
logger.debug(_("finished pruning, freed %(freed)d bytes"), locals())
self.cache.prune()
def app_factory(global_config, **local_conf):

View File

@ -148,8 +148,6 @@ class ApiServer(Server):
self.default_store = 'file'
self.key_file = ""
self.cert_file = ""
self.image_cache_datadir = os.path.join(self.test_dir,
'cache')
self.image_dir = os.path.join(self.test_dir,
"images")
self.pid_file = os.path.join(self.test_dir,
@ -175,6 +173,9 @@ class ApiServer(Server):
self.delayed_delete = delayed_delete
self.owner_is_tenant = True
self.cache_pipeline = "" # Set to cache for cache middleware
self.image_cache_dir = os.path.join(self.test_dir,
'cache')
self.image_cache_driver = 'sqlite'
self.conf_base = """[DEFAULT]
verbose = %(verbose)s
debug = %(debug)s
@ -205,7 +206,8 @@ delayed_delete = %(delayed_delete)s
owner_is_tenant = %(owner_is_tenant)s
scrub_time = 5
scrubber_datadir = %(scrubber_datadir)s
image_cache_datadir = %(image_cache_datadir)s
image_cache_dir = %(image_cache_dir)s
image_cache_driver = %(image_cache_driver)s
[pipeline:glance-api]
pipeline = versionnegotiation context %(cache_pipeline)s apiv1app

View File

@ -0,0 +1,182 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests a Glance API server which uses the caching middleware that
uses the default SQLite cache driver. We use the filesystem store,
but that is really not relevant, as the image cache is transparent
to the backend store.
"""
import hashlib
import json
import os
import shutil
import time
import httplib2
from glance.tests import functional
from glance.tests.utils import skip_if_disabled
FIVE_KB = 5 * 1024
class BaseCacheMiddlewareTest(object):
@skip_if_disabled
def test_cache_middleware_transparent(self):
"""
We test that putting the cache middleware into the
application pipeline gives us transparent image caching
"""
self.cleanup()
self.start_servers(**self.__dict__.copy())
api_port = self.api_port
registry_port = self.registry_port
# Verify no image 1
path = "http://%s:%d/v1/images/1" % ("0.0.0.0", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'HEAD')
self.assertEqual(response.status, 404)
# Add an image and verify a 200 OK is returned
image_data = "*" * FIVE_KB
headers = {'Content-Type': 'application/octet-stream',
'X-Image-Meta-Name': 'Image1',
'X-Image-Meta-Is-Public': 'True'}
path = "http://%s:%d/v1/images" % ("0.0.0.0", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'POST', headers=headers,
body=image_data)
self.assertEqual(response.status, 201)
data = json.loads(content)
self.assertEqual(data['image']['checksum'],
hashlib.md5(image_data).hexdigest())
self.assertEqual(data['image']['size'], FIVE_KB)
self.assertEqual(data['image']['name'], "Image1")
self.assertEqual(data['image']['is_public'], True)
# Verify image not in cache
image_cached_path = os.path.join(self.api_server.image_cache_dir,
'1')
self.assertFalse(os.path.exists(image_cached_path))
# Grab the image
path = "http://%s:%d/v1/images/1" % ("0.0.0.0", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 200)
# Verify image now in cache
image_cached_path = os.path.join(self.api_server.image_cache_dir,
'1')
# You might wonder why the heck this is here... well, it's here
# because it took me forever to figure out that the disk write
# cache in Linux was causing random failures of the os.path.exists
# assert directly below this. Basically, since the cache is writing
# the image file to disk in a different process, the write buffers
# don't flush the cache file during an os.rename() properly, resulting
# in a false negative on the file existence check below. This little
# loop pauses the execution of this process for no more than 1.5
# seconds. If after that time the cached image file still doesn't
# appear on disk, something really is wrong, and the assert should
# trigger...
i = 0
while not os.path.exists(image_cached_path) and i < 30:
time.sleep(0.05)
i = i + 1
self.assertTrue(os.path.exists(image_cached_path))
self.stop_servers()
class TestImageCacheXattr(functional.FunctionalTest,
BaseCacheMiddlewareTest):
"""Functional tests that exercise the image cache using the xattr driver"""
def setUp(self):
"""
Test to see if the pre-requisites for the image cache
are working (python-xattr installed and xattr support on the
filesystem)
"""
if getattr(self, 'disabled', False):
return
if not getattr(self, 'inited', False):
try:
import xattr
except ImportError:
self.inited = True
self.disabled = True
self.disabled_message = ("python-xattr not installed.")
return
self.inited = True
self.disabled = False
self.cache_pipeline = "cache"
self.image_cache_driver = "xattr"
super(TestImageCacheXattr, self).setUp()
def tearDown(self):
if os.path.exists(self.api_server.image_cache_dir):
shutil.rmtree(self.api_server.image_cache_dir)
class TestImageCacheSqlite(functional.FunctionalTest,
BaseCacheMiddlewareTest):
"""
Functional tests that exercise the image cache using the
SQLite driver
"""
def setUp(self):
"""
Test to see if the pre-requisites for the image cache
are working (python-xattr installed and xattr support on the
filesystem)
"""
if getattr(self, 'disabled', False):
return
if not getattr(self, 'inited', False):
try:
import sqlite3
except ImportError:
self.inited = True
self.disabled = True
self.disabled_message = ("python-sqlite3 not installed.")
return
self.inited = True
self.disabled = False
self.cache_pipeline = "cache"
super(TestImageCacheSqlite, self).setUp()
def tearDown(self):
if os.path.exists(self.api_server.image_cache_dir):
shutil.rmtree(self.api_server.image_cache_dir)

View File

@ -1,94 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests a Glance API server which uses the caching middleware. We
use the filesystem store, but that is really not relevant, as the
image cache is transparent to the backend store.
"""
import hashlib
import json
import os
import unittest
import httplib2
from glance.tests.functional import test_api
from glance.tests.utils import execute, skip_if_disabled
FIVE_KB = 5 * 1024
class TestImageCache(test_api.TestApi):
"""Functional tests that exercise the image cache"""
@skip_if_disabled
def test_cache_middleware_transparent(self):
"""
We test that putting the cache middleware into the
application pipeline gives us transparent image caching
"""
self.cleanup()
self.cache_pipeline = "cache"
self.start_servers(**self.__dict__.copy())
api_port = self.api_port
registry_port = self.registry_port
# Verify no image 1
path = "http://%s:%d/v1/images/1" % ("0.0.0.0", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'HEAD')
self.assertEqual(response.status, 404)
# Add an image and verify a 200 OK is returned
image_data = "*" * FIVE_KB
headers = {'Content-Type': 'application/octet-stream',
'X-Image-Meta-Name': 'Image1',
'X-Image-Meta-Is-Public': 'True'}
path = "http://%s:%d/v1/images" % ("0.0.0.0", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'POST', headers=headers,
body=image_data)
self.assertEqual(response.status, 201)
data = json.loads(content)
self.assertEqual(data['image']['checksum'],
hashlib.md5(image_data).hexdigest())
self.assertEqual(data['image']['size'], FIVE_KB)
self.assertEqual(data['image']['name'], "Image1")
self.assertEqual(data['image']['is_public'], True)
# Verify image not in cache
image_cached_path = os.path.join(self.api_server.image_cache_datadir,
'1')
self.assertFalse(os.path.exists(image_cached_path))
# Grab the image
path = "http://%s:%d/v1/images/1" % ("0.0.0.0", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 200)
# Verify image now in cache
image_cached_path = os.path.join(self.api_server.image_cache_datadir,
'1')
self.assertTrue(os.path.exists(image_cached_path))
self.stop_servers()

View File

@ -17,27 +17,17 @@
"""Stubouts, mocks and fixtures for the test suite"""
import datetime
import httplib
import operator
import os
import shutil
import StringIO
import sys
import stubout
import webob
from glance.api import v1 as server
from glance.api.middleware import version_negotiation
import glance.common.client
from glance.common import context
from glance.common import exception
from glance.registry.api import v1 as rserver
from glance.api import v1 as server
from glance.api.middleware import version_negotiation
import glance.store
import glance.store.filesystem
import glance.store.http
import glance.registry.db.api
FAKE_FILESYSTEM_ROOTDIR = os.path.join('/tmp', 'glance-tests')
@ -199,7 +189,7 @@ def stub_out_registry_and_store_server(stubs):
fake_image_iter)
def stub_out_registry_server(stubs):
def stub_out_registry_server(stubs, **kwargs):
"""
Mocks calls to 127.0.0.1 on 9191 for testing so
that a real Glance Registry server does not need to be up and
@ -226,8 +216,7 @@ def stub_out_registry_server(stubs):
self.req.body = body
def getresponse(self):
sql_connection = os.environ.get('GLANCE_SQL_CONNECTION',
"sqlite:///")
sql_connection = kwargs.get('sql_connection', "sqlite:///")
context_class = 'glance.registry.context.RequestContext'
options = {'sql_connection': sql_connection, 'verbose': VERBOSE,
'debug': DEBUG, 'context_class': context_class}

View File

@ -1,114 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import httplib
import os
import random
import shutil
import unittest
import stubout
import webob
from glance import registry
from glance.api import v1 as server
from glance.api.middleware import cache
from glance.common import context
from glance.tests import stubs
FIXTURE_DATA = '*' * 1024
class TestCacheMiddleware(unittest.TestCase):
"""Test case for the cache middleware"""
def setUp(self):
self.cache_dir = os.path.join("/", "tmp", "test.cache.%d" %
random.randint(0, 1000000))
self.filesystem_store_datadir = os.path.join(self.cache_dir,
'filestore')
self.options = {
'verbose': True,
'debug': True,
'image_cache_datadir': self.cache_dir,
'registry_host': '0.0.0.0',
'registry_port': 9191,
'default_store': 'file',
'filesystem_store_datadir': self.filesystem_store_datadir
}
self.cache_filter = cache.CacheFilter(
server.API(self.options), self.options)
self.api = context.ContextMiddleware(self.cache_filter, self.options)
self.stubs = stubout.StubOutForTesting()
stubs.stub_out_registry_server(self.stubs)
def tearDown(self):
self.stubs.UnsetAll()
if os.path.exists(self.cache_dir):
shutil.rmtree(self.cache_dir)
def test_cache_image(self):
"""
Verify no images cached at start, then request an image,
and verify the image is in the cache afterwards
"""
image_cached_path = os.path.join(self.cache_dir, '1')
self.assertFalse(os.path.exists(image_cached_path))
req = webob.Request.blank('/images/1')
res = req.get_response(self.api)
self.assertEquals(404, res.status_int)
fixture_headers = {'x-image-meta-store': 'file',
'x-image-meta-disk-format': 'vhd',
'x-image-meta-container-format': 'ovf',
'x-image-meta-name': 'fake image #1'}
req = webob.Request.blank("/images")
req.method = 'POST'
for k, v in fixture_headers.iteritems():
req.headers[k] = v
req.headers['Content-Type'] = 'application/octet-stream'
req.body = FIXTURE_DATA
res = req.get_response(self.api)
self.assertEquals(res.status_int, httplib.CREATED)
req = webob.Request.blank('/images/1')
res = req.get_response(self.api)
self.assertEquals(200, res.status_int)
for chunk in res.body:
pass # We do this to trigger tee'ing the file
self.assertTrue(os.path.exists(image_cached_path))
self.assertEqual(0, self.cache_filter.cache.get_hit_count('1'))
# Now verify that the next call to GET /images/1
# yields the image from the cache...
req = webob.Request.blank('/images/1')
res = req.get_response(self.api)
self.assertEquals(200, res.status_int)
for chunk in res.body:
pass # We do this to trigger a hit read
self.assertTrue(os.path.exists(image_cached_path))
self.assertEqual(1, self.cache_filter.cache.get_hit_count('1'))

View File

@ -21,112 +21,45 @@ import shutil
import StringIO
import unittest
import stubout
from glance import image_cache
from glance.image_cache import prefetcher
from glance.common import exception
from glance.tests import stubs
from glance.tests.utils import skip_if_disabled
FIXTURE_DATA = '*' * 1024
class TestImageCache(unittest.TestCase):
def setUp(self):
self.cache_dir = os.path.join("/", "tmp", "test.cache.%d" %
random.randint(0, 1000000))
self.options = {'image_cache_datadir': self.cache_dir}
self.cache = image_cache.ImageCache(self.options)
class ImageCacheTestCase(object):
def tearDown(self):
if os.path.exists(self.cache_dir):
shutil.rmtree(self.cache_dir)
def test_auto_properties(self):
@skip_if_disabled
def test_is_cached(self):
"""
Test that the auto-assigned properties are correct
Verify is_cached(1) returns 0, then add something to the cache
and verify is_cached(1) returns 1.
"""
self.assertEqual(self.cache.path, self.cache_dir)
self.assertEqual(self.cache.invalid_path,
os.path.join(self.cache_dir,
'invalid'))
self.assertEqual(self.cache.incomplete_path,
os.path.join(self.cache_dir,
'incomplete'))
self.assertEqual(self.cache.prefetch_path,
os.path.join(self.cache_dir,
'prefetch'))
self.assertEqual(self.cache.prefetching_path,
os.path.join(self.cache_dir,
'prefetching'))
FIXTURE_FILE = StringIO.StringIO(FIXTURE_DATA)
def test_hit(self):
"""
Verify hit(1) returns 0, then add something to the cache
and verify hit(1) returns 1.
"""
meta = {'id': 1,
'name': 'Image1',
'size': len(FIXTURE_DATA)}
self.assertFalse(self.cache.is_cached(1))
self.assertFalse(self.cache.hit(1))
self.assertTrue(self.cache.cache_image_file(1, FIXTURE_FILE))
with self.cache.open(meta, 'wb') as cache_file:
cache_file.write(FIXTURE_DATA)
self.assertTrue(self.cache.hit(1))
def test_bad_open_mode(self):
"""
Test than an exception is raised if attempting to open
the cache file context manager with an invalid mode string
"""
meta = {'id': 1,
'name': 'Image1',
'size': len(FIXTURE_DATA)}
bad_modes = ('xb', 'wa', 'rw')
for mode in bad_modes:
exc_raised = False
try:
with self.cache.open(meta, 'xb') as cache_file:
cache_file.write(FIXTURE_DATA)
except:
exc_raised = True
self.assertTrue(exc_raised,
'Using mode %s, failed to raise exception.' % mode)
self.assertTrue(self.cache.is_cached(1))
@skip_if_disabled
def test_read(self):
"""
Verify hit(1) returns 0, then add something to the cache
Verify is_cached(1) returns 0, then add something to the cache
and verify after a subsequent read from the cache that
hit(1) returns 1.
is_cached(1) returns 1.
"""
meta = {'id': 1,
'name': 'Image1',
'size': len(FIXTURE_DATA)}
FIXTURE_FILE = StringIO.StringIO(FIXTURE_DATA)
self.assertFalse(self.cache.hit(1))
self.assertFalse(self.cache.is_cached(1))
with self.cache.open(meta, 'wb') as cache_file:
cache_file.write(FIXTURE_DATA)
buff = StringIO.StringIO()
with self.cache.open(meta, 'rb') as cache_file:
for chunk in cache_file:
buff.write(chunk)
self.assertEqual(FIXTURE_DATA, buff.getvalue())
def test_open_for_read(self):
"""
Test convenience wrapper for opening a cache file via
its image identifier.
"""
meta = {'id': 1,
'name': 'Image1',
'size': len(FIXTURE_DATA)}
self.assertFalse(self.cache.hit(1))
with self.cache.open(meta, 'wb') as cache_file:
cache_file.write(FIXTURE_DATA)
self.assertTrue(self.cache.cache_image_file(1, FIXTURE_FILE))
buff = StringIO.StringIO()
with self.cache.open_for_read(1) as cache_file:
@ -135,102 +68,238 @@ class TestImageCache(unittest.TestCase):
self.assertEqual(FIXTURE_DATA, buff.getvalue())
def test_purge(self):
@skip_if_disabled
def test_open_for_read(self):
"""
Test purge method that removes an image from the cache
Test convenience wrapper for opening a cache file via
its image identifier.
"""
meta = {'id': 1,
'name': 'Image1',
'size': len(FIXTURE_DATA)}
FIXTURE_FILE = StringIO.StringIO(FIXTURE_DATA)
self.assertFalse(self.cache.hit(1))
self.assertFalse(self.cache.is_cached(1))
with self.cache.open(meta, 'wb') as cache_file:
cache_file.write(FIXTURE_DATA)
self.assertTrue(self.cache.cache_image_file(1, FIXTURE_FILE))
self.assertTrue(self.cache.hit(1))
buff = StringIO.StringIO()
with self.cache.open_for_read(1) as cache_file:
for chunk in cache_file:
buff.write(chunk)
self.cache.purge(1)
self.assertEqual(FIXTURE_DATA, buff.getvalue())
self.assertFalse(self.cache.hit(1))
def test_clear(self):
@skip_if_disabled
def test_delete(self):
"""
Test purge method that removes an image from the cache
Test delete method that removes an image from the cache
"""
metas = [
{'id': 1,
'name': 'Image1',
'size': len(FIXTURE_DATA)},
{'id': 2,
'name': 'Image2',
'size': len(FIXTURE_DATA)}]
FIXTURE_FILE = StringIO.StringIO(FIXTURE_DATA)
self.assertFalse(self.cache.is_cached(1))
self.assertTrue(self.cache.cache_image_file(1, FIXTURE_FILE))
self.assertTrue(self.cache.is_cached(1))
self.cache.delete(1)
self.assertFalse(self.cache.is_cached(1))
@skip_if_disabled
def test_delete_all(self):
"""
Test delete method that removes an image from the cache
"""
for image_id in (1, 2):
self.assertFalse(self.cache.is_cached(image_id))
for image_id in (1, 2):
self.assertFalse(self.cache.hit(image_id))
for meta in metas:
with self.cache.open(meta, 'wb') as cache_file:
cache_file.write(FIXTURE_DATA)
FIXTURE_FILE = StringIO.StringIO(FIXTURE_DATA)
self.assertTrue(self.cache.cache_image_file(image_id,
FIXTURE_FILE))
for image_id in (1, 2):
self.assertTrue(self.cache.hit(image_id))
self.assertTrue(self.cache.is_cached(image_id))
self.cache.clear()
self.cache.delete_all()
for image_id in (1, 2):
self.assertFalse(self.cache.hit(image_id))
self.assertFalse(self.cache.is_cached(image_id))
def test_prefetch(self):
@skip_if_disabled
def test_prune(self):
"""
Test that queueing for prefetch and prefetching works properly
Test that pruning the cache works as expected...
"""
meta = {'id': 1,
'name': 'Image1',
'size': len(FIXTURE_DATA)}
self.assertEqual(0, self.cache.get_cache_size())
self.assertFalse(self.cache.hit(1))
# Add a bunch of images to the cache. The max cache
# size for the cache is set to 5KB and each image is
# 1K. We add 10 images to the cache and then we'll
# prune it. We should see only 5 images left after
# pruning, and the images that are least recently accessed
# should be the ones pruned...
for x in xrange(0, 10):
FIXTURE_FILE = StringIO.StringIO(FIXTURE_DATA)
self.assertTrue(self.cache.cache_image_file(x,
FIXTURE_FILE))
self.cache.queue_prefetch(meta)
self.assertEqual(10 * 1024, self.cache.get_cache_size())
self.assertFalse(self.cache.hit(1))
# OK, hit the images that are now cached...
for x in xrange(0, 10):
buff = StringIO.StringIO()
with self.cache.open_for_read(x) as cache_file:
for chunk in cache_file:
buff.write(chunk)
# Test that an exception is raised if we try to queue the
# same image for prefetching
self.assertRaises(exception.Invalid, self.cache.queue_prefetch,
meta)
self.cache.prune()
self.cache.delete_queued_prefetch_image(1)
self.assertEqual(5 * 1024, self.cache.get_cache_size())
self.assertFalse(self.cache.hit(1))
for x in xrange(0, 5):
self.assertFalse(self.cache.is_cached(x),
"Image %s was cached!" % x)
# Test that an exception is raised if we try to queue for
# prefetching an image that has already been cached
for x in xrange(5, 10):
self.assertTrue(self.cache.is_cached(x),
"Image %s was not cached!" % x)
with self.cache.open(meta, 'wb') as cache_file:
cache_file.write(FIXTURE_DATA)
@skip_if_disabled
def test_queue(self):
"""
Test that queueing works properly
"""
self.assertTrue(self.cache.hit(1))
self.assertFalse(self.cache.is_cached(1))
self.assertFalse(self.cache.is_queued(1))
self.assertRaises(exception.Invalid, self.cache.queue_prefetch,
meta)
FIXTURE_FILE = StringIO.StringIO(FIXTURE_DATA)
self.cache.purge(1)
self.assertTrue(self.cache.queue_image(1))
# We can't prefetch an image that has not been queued
# for prefetching
self.assertRaises(OSError, self.cache.do_prefetch, 1)
self.assertTrue(self.cache.is_queued(1))
self.assertFalse(self.cache.is_cached(1))
self.cache.queue_prefetch(meta)
# Should not return True if the image is already
# queued for caching...
self.assertFalse(self.cache.queue_image(1))
self.assertTrue(self.cache.is_image_queued_for_prefetch(1))
self.assertFalse(self.cache.is_cached(1))
self.assertFalse(self.cache.is_currently_prefetching_any_images())
self.assertFalse(self.cache.is_image_currently_prefetching(1))
# Test that we return False if we try to queue
# an image that has already been cached
self.assertEqual(str(1), self.cache.pop_prefetch_item())
self.assertTrue(self.cache.cache_image_file(1, FIXTURE_FILE))
self.cache.do_prefetch(1)
self.assertFalse(self.cache.is_image_queued_for_prefetch(1))
self.assertTrue(self.cache.is_currently_prefetching_any_images())
self.assertTrue(self.cache.is_image_currently_prefetching(1))
self.assertFalse(self.cache.is_queued(1))
self.assertTrue(self.cache.is_cached(1))
self.assertFalse(self.cache.queue_image(1))
self.cache.delete(1)
for x in xrange(0, 3):
self.assertTrue(self.cache.queue_image(x))
self.assertEqual(self.cache.get_cache_queue(),
['0', '1', '2'])
@skip_if_disabled
def test_prefetcher(self):
"""
Test that the prefetcher application works
"""
stubs.stub_out_registry_server(self.stubs)
FIXTURE_FILE = StringIO.StringIO(FIXTURE_DATA)
# Should return True since there is nothing in the queue
pf = prefetcher.Prefetcher(self.options)
self.assertTrue(pf.run())
for x in xrange(2, 3):
self.assertTrue(self.cache.queue_image(x))
# Should return False since there is no metadata for these
# images in the registry
self.assertFalse(pf.run())
class TestImageCacheXattr(unittest.TestCase,
ImageCacheTestCase):
"""Tests image caching when xattr is used in cache"""
def setUp(self):
"""
Test to see if the pre-requisites for the image cache
are working (python-xattr installed and xattr support on the
filesystem)
"""
if getattr(self, 'disable', False):
return
if not getattr(self, 'inited', False):
try:
import xattr
except ImportError:
self.inited = True
self.disabled = True
self.disabled_message = ("python-xattr not installed.")
return
self.inited = True
self.disabled = False
self.cache_dir = os.path.join("/", "tmp", "test.cache.%d" %
random.randint(0, 1000000))
self.options = {'image_cache_dir': self.cache_dir,
'image_cache_driver': 'xattr',
'image_cache_max_size': 1024 * 5,
'registry_host': '0.0.0.0',
'registry_port': 9191}
self.cache = image_cache.ImageCache(self.options)
self.stubs = stubout.StubOutForTesting()
def tearDown(self):
if os.path.exists(self.cache_dir):
shutil.rmtree(self.cache_dir)
self.stubs.UnsetAll()
class TestImageCacheSqlite(unittest.TestCase,
ImageCacheTestCase):
"""Tests image caching when SQLite is used in cache"""
def setUp(self):
"""
Test to see if the pre-requisites for the image cache
are working (python-sqlite3 installed)
"""
if getattr(self, 'disable', False):
return
if not getattr(self, 'inited', False):
try:
import sqlite3
except ImportError:
self.inited = True
self.disabled = True
self.disabled_message = ("python-sqlite3 not installed.")
return
self.inited = True
self.disabled = False
self.cache_dir = os.path.join("/", "tmp", "test.cache.%d" %
random.randint(0, 1000000))
self.options = {'image_cache_dir': self.cache_dir,
'image_cache_driver': 'sqlite',
'image_cache_max_size': 1024 * 5,
'registry_host': '0.0.0.0',
'registry_port': 9191}
self.cache = image_cache.ImageCache(self.options)
self.stubs = stubout.StubOutForTesting()
def tearDown(self):
if os.path.exists(self.cache_dir):
shutil.rmtree(self.cache_dir)
self.stubs.UnsetAll()

View File

@ -81,7 +81,7 @@ def skip_if_disabled(func):
test_obj = a[0]
message = getattr(test_obj, 'disabled_message',
'Test disabled')
if test_obj.disabled:
if getattr(test_obj, 'disabled', False):
raise nose.SkipTest(message)
func(*a, **kwargs)
return wrapped

View File

@ -18,10 +18,8 @@
"""
A few utility routines used throughout Glance
"""
import errno
import logging
import xattr
import logging
logger = logging.getLogger('glance.utils')
@ -167,82 +165,3 @@ class PrettyTable(object):
justified = clipped_data.ljust(width)
return justified
def _make_namespaced_xattr_key(key, namespace='user'):
"""Create a fully-qualified xattr-key by including the intended namespace.
Namespacing differs among OSes[1]:
FreeBSD: user, system
Linux: user, system, trusted, security
MacOS X: not needed
Mac OS X won't break if we include a namespace qualifier, so, for
simplicity, we always include it.
--
[1] http://en.wikipedia.org/wiki/Extended_file_attributes
"""
namespaced_key = ".".join([namespace, key])
return namespaced_key
def get_xattr(path, key, **kwargs):
"""Return the value for a particular xattr
If the key doesn't not exist, or xattrs aren't supported by the file
system then a KeyError will be raised, that is, unless you specify a
default using kwargs.
"""
namespaced_key = _make_namespaced_xattr_key(key)
entry_xattr = xattr.xattr(path)
try:
return entry_xattr[namespaced_key]
except KeyError:
if 'default' in kwargs:
return kwargs['default']
else:
raise
def set_xattr(path, key, value):
"""Set the value of a specified xattr.
If xattrs aren't supported by the file-system, we skip setting the value.
"""
namespaced_key = _make_namespaced_xattr_key(key)
entry_xattr = xattr.xattr(path)
try:
entry_xattr.set(namespaced_key, str(value))
except IOError as e:
if e.errno == errno.EOPNOTSUPP:
logger.warn(_("xattrs not supported, skipping..."))
else:
raise
def inc_xattr(path, key, n=1):
"""Increment the value of an xattr (assuming it is an integer).
BEWARE, this code *does* have a RACE CONDITION, since the
read/update/write sequence is not atomic.
Since the use-case for this function is collecting stats--not critical--
the benefits of simple, lock-free code out-weighs the possibility of an
occasional hit not being counted.
"""
try:
count = int(get_xattr(path, key))
except KeyError:
# NOTE(sirp): a KeyError is generated in two cases:
# 1) xattrs is not supported by the filesystem
# 2) the key is not present on the file
#
# In either case, just ignore it...
pass
else:
# NOTE(sirp): only try to bump the count if xattrs is supported
# and the key is present
count += n
set_xattr(path, key, str(count))

View File

@ -121,7 +121,7 @@ setup(
'bin/glance-api',
'bin/glance-cache-prefetcher',
'bin/glance-cache-pruner',
'bin/glance-cache-reaper',
'bin/glance-cache-cleaner',
'bin/glance-control',
'bin/glance-manage',
'bin/glance-registry',