More cache refactoring - Management Middleware

Fixes LP Bug#883236 - Cache management needs refactoring

Refactors cache management middleware by simplifying
the middleware to image cache calls. The middleware
now only contains calls for retrieving records of
which images are cached, queueing an image to be
cached, deleting a single cached image, and deleting
all cached images. I removed the special calls for
reaping invalid or stalled images, as that is handled
by the cache cleaner utility...

Adds functional test case with caching and cache
management middleware enabled.

Made a slight change to the way the functional test
creates a config file: now it adds the configuration
files used to start servers to $TEST_DIR/etc/$SERVER.conf
so that test runner can see the config files that
caused failures for tests.

TODO: Add test case for queue, then run prefetcher, then
check image is cached...
TODO: Add docs for cache management API
TODO: Make bin/glance gracefully handle cache not existing

Change-Id: I76e95ffeeabc079b48b5ce2aeb6520cbdbf6d1e1
This commit is contained in:
Jay Pipes 2011-10-28 17:01:38 -04:00
parent 6970aa883a
commit 1ccc0e2f13
8 changed files with 434 additions and 76 deletions

View File

@ -19,10 +19,8 @@
Controller for Image Cache Management API
"""
import httplib
import json
import logging
import webob.dec
import webob.exc
from glance.common import exception
@ -31,6 +29,8 @@ from glance import api
from glance import image_cache
from glance import registry
logger = logging.getLogger(__name__)
class Controller(api.BaseController):
"""
@ -42,45 +42,39 @@ class Controller(api.BaseController):
self.cache = image_cache.ImageCache(self.options)
def index(self, req):
status = req.str_params.get('status')
if status == 'invalid':
entries = list(self.cache.invalid_entries())
elif status == 'incomplete':
entries = list(self.cache.incomplete_entries())
elif status == 'prefetching':
entries = list(self.cache.prefetch_entries())
else:
entries = list(self.cache.entries())
"""
GET /cached_images
return dict(cached_images=entries)
Returns a mapping of records about cached images.
"""
images = self.cache.get_cached_images()
return dict(cached_images=images)
def delete(self, req, id):
self.cache.purge(id)
"""
DELETE /cached_images/1
Removes an image from the cache.
"""
self.cache.delete(id)
def delete_collection(self, req):
"""
DELETE /cached_images - Clear all active cached images
DELETE /cached_images?status=invalid - Reap invalid cached images
DELETE /cached_images?status=incomplete - Reap stalled cached images
Removes all images from the cache.
"""
status = req.str_params.get('status')
if status == 'invalid':
num_reaped = self.cache.reap_invalid()
return dict(num_reaped=num_reaped)
elif status == 'incomplete':
num_reaped = self.cache.reap_stalled()
return dict(num_reaped=num_reaped)
else:
num_purged = self.cache.clear()
return dict(num_purged=num_purged)
self.cache.delete_all()
def update(self, req, id):
"""PUT /cached_images/1 is used to prefetch an image into the cache"""
image_meta = self.get_active_image_meta_or_404(req, id)
try:
self.cache.queue_prefetch(image_meta)
except exception.Invalid, e:
raise webob.exc.HTTPBadRequest(explanation="%s" % e)
"""
PUT /cached_images/1
Queues an image for caching. We do not check to see if
the image is in the registry here. That is done by the
prefetcher...
"""
self.cache.queue_image(image_id)
class CachedImageDeserializer(wsgi.JSONRequestDeserializer):

View File

@ -24,25 +24,25 @@ import logging
from glance.api import cached_images
from glance.common import wsgi
logger = logging.getLogger('glance.api.middleware.image_cache')
logger = logging.getLogger(__name__)
class CacheManageFilter(wsgi.Middleware):
def __init__(self, app, options):
super(CacheManageFilter, self).__init__(app)
map = app.map
resource = cached_images.create_resource(options)
map.resource("cached_image", "cached_images",
controller=resource,
collection={'reap_invalid': 'POST',
'reap_stalled': 'POST'})
controller=resource)
map.connect("/cached_images",
controller=resource,
action="delete_collection",
conditions=dict(method=["DELETE"]))
logger.info(_("Initialized image cache management middleware"))
super(CacheManageFilter, self).__init__(app)
def filter_factory(global_conf, **local_conf):
"""

View File

@ -107,6 +107,12 @@ class ImageCache(object):
"""
return self.driver.get_hit_count(image_id)
def get_cached_images(self):
"""
Returns a list of records about cached images.
"""
return self.driver.get_cached_images()
def delete_all(self):
"""
Removes all cached image files and any attributes about the images

View File

@ -49,6 +49,25 @@ class Driver(object):
"""
raise NotImplementedError
def get_cached_images(self):
"""
Returns a list of records about cached images.
The list of records shall be ordered by image ID and shall look like::
[
{
'image_id': <IMAGE_ID>,
'hits': INTEGER,
'last_modified': ISO_TIMESTAMP,
'last_accessed': ISO_TIMESTAMP,
'size': INTEGER
}, ...
]
"""
return NotImplementedError
def is_cached(self, image_id):
"""
Returns True if the image with the supplied ID has its image

View File

@ -71,6 +71,11 @@ class SqliteConnection(sqlite3.Connection):
return self._timeout(lambda: sqlite3.Connection.commit(self))
def dict_factory(cur, row):
return dict(
((col[0], row[idx]) for idx, col in enumerate(cur.description)))
class Driver(base.Driver):
"""
@ -116,6 +121,7 @@ class Driver(base.Driver):
CREATE TABLE IF NOT EXISTS cached_images (
image_id TEXT PRIMARY KEY,
last_access REAL DEFAULT 0.0,
last_modified REAL DEFAULT 0.0,
size INTEGER DEFAULT 0,
hits INTEGER DEFAULT 0,
checksum TEXT
@ -140,6 +146,36 @@ class Driver(base.Driver):
sizes.append(file_info[stat.ST_SIZE])
return sum(sizes)
def get_hit_count(self, image_id):
"""
Return the number of hits that an image has.
:param image_id: Opaque image identifier
"""
if not self.is_cached(image_id):
return 0
hits = 0
with self.get_db() as db:
cur = db.execute("""SELECT hits FROM cached_images
WHERE image_id = ?""",
(image_id,))
hits = cur.fetchone()[0]
return hits
def get_cached_images(self):
"""
Returns a list of records about cached images.
"""
logger.debug(_("Gathering cached image entries."))
with self.get_db() as db:
cur = db.execute("""SELECT
image_id, hits, last_access, last_modified, size
FROM cached_images
ORDER BY image_id""")
cur.row_factory = dict_factory
return [r for r in cur]
def is_cached(self, image_id):
"""
Returns True if the image with the supplied ID has its image
@ -179,23 +215,6 @@ class Driver(base.Driver):
path = self.get_image_filepath(image_id, 'queue')
return os.path.exists(path)
def get_hit_count(self, image_id):
"""
Return the number of hits that an image has.
:param image_id: Opaque image identifier
"""
if not self.is_cached(image_id):
return 0
hits = 0
with self.get_db() as db:
cur = db.execute("""SELECT hits FROM cached_images
WHERE image_id = ?""",
(image_id,))
hits = cur.fetchone()[0]
return hits
def delete_all(self):
"""
Removes all cached image files and any attributes about the images
@ -273,9 +292,13 @@ class Driver(base.Driver):
if self.is_queued(image_id):
os.unlink(self.get_image_filepath(image_id, 'queue'))
filesize = os.path.getsize(final_path)
now = time.time()
db.execute("""INSERT INTO cached_images
(image_id, last_access, hits)
VALUES (?, 0, 0)""", (image_id, ))
(image_id, last_access, last_modified, hits, size)
VALUES (?, 0, ?, 0, ?)""",
(image_id, now, filesize))
db.commit()
def rollback(e):

View File

@ -117,6 +117,42 @@ class Driver(base.Driver):
sizes.append(file_info[stat.ST_SIZE])
return sum(sizes)
def get_hit_count(self, image_id):
"""
Return the number of hits that an image has.
:param image_id: Opaque image identifier
"""
if not self.is_cached(image_id):
return 0
path = self.get_image_filepath(image_id)
return int(get_xattr(path, 'hits', default=0))
def get_cached_images(self):
"""
Returns a list of records about cached images.
"""
logger.debug(_("Gathering cached image entries."))
entries = []
for path in get_all_regular_files(self.base_dir):
image_id = os.path.basename(path)
entry = {}
entry['image_id'] = image_id
file_info = os.stat(path)
entry['last_modified'] = iso8601_from_timestamp(
file_info[stat.ST_MTIME])
entry['last_accessed'] = iso8601_from_timestamp(
file_info[stat.ST_ATIME])
entry['size'] = file_info[stat.ST_SIZE]
entry['hits'] = self.get_hit_count(image_id)
entries.append(entry)
entries.sort() # Order by ID
return entries
def is_cached(self, image_id):
"""
Returns True if the image with the supplied ID has its image
@ -154,18 +190,6 @@ class Driver(base.Driver):
path = self.get_image_filepath(image_id, 'queue')
return os.path.exists(path)
def get_hit_count(self, image_id):
"""
Return the number of hits that an image has.
:param image_id: Opaque image identifier
"""
if not self.is_cached(image_id):
return 0
path = self.get_image_filepath(image_id)
return int(get_xattr(path, 'hits', default=0))
def delete_all(self):
"""
Removes all cached image files and any attributes about the images
@ -525,3 +549,7 @@ def inc_xattr(path, key, n=1):
# and the key is present
count += n
set_xattr(path, key, str(count))
def iso8601_from_timestamp(timestamp):
return datetime.datetime.utcfromtimestamp(timestamp).isoformat()

View File

@ -30,11 +30,11 @@ import random
import shutil
import signal
import socket
import tempfile
import time
import unittest
import urlparse
from glance.common import utils
from glance.tests.utils import execute, get_unused_port
from sqlalchemy import create_engine
@ -79,7 +79,7 @@ class Server(object):
self.no_venv = False
self.test_dir = test_dir
self.bind_port = port
self.conf_file = None
self.conf_file_name = None
self.conf_base = None
self.server_control = './bin/glance-control'
self.exec_env = None
@ -90,7 +90,7 @@ class Server(object):
destination. Returns the name of the configuration file.
"""
if self.conf_file:
if self.conf_file_name:
return self.conf_file_name
if not self.conf_base:
raise RuntimeError("Subclass did not populate config_base!")
@ -102,11 +102,13 @@ class Server(object):
# A config file to use just for this test...we don't want
# to trample on currently-running Glance servers, now do we?
conf_file = tempfile.NamedTemporaryFile()
conf_file.write(self.conf_base % conf_override)
conf_file.flush()
self.conf_file = conf_file
self.conf_file_name = conf_file.name
conf_dir = os.path.join(self.test_dir, 'etc')
conf_filepath = os.path.join(conf_dir, "%s.conf" % self.server_name)
utils.safe_mkdirs(conf_dir)
with open(conf_filepath, 'wb') as conf_file:
conf_file.write(self.conf_base % conf_override)
conf_file.flush()
self.conf_file_name = conf_file.name
return self.conf_file_name
@ -227,6 +229,9 @@ paste.filter_factory = glance.api.middleware.version_negotiation:filter_factory
[filter:cache]
paste.filter_factory = glance.api.middleware.cache:filter_factory
[filter:cache_manage]
paste.filter_factory = glance.api.middleware.cache_manage:filter_factory
[filter:context]
paste.filter_factory = glance.common.context:filter_factory
"""

View File

@ -107,6 +107,214 @@ class BaseCacheMiddlewareTest(object):
self.stop_servers()
class BaseCacheManageMiddlewareTest(object):
"""Base test class for testing cache management middleware"""
def verify_no_images(self):
path = "http://%s:%d/v1/images" % ("0.0.0.0", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 200)
data = json.loads(content)
self.assertTrue('images' in data)
self.assertEqual(0, len(data['images']))
@skip_if_disabled
def test_cache_manage_get_cached_images(self):
"""
Tests that cached images are queryable
"""
self.cleanup()
self.start_servers(**self.__dict__.copy())
api_port = self.api_port
registry_port = self.registry_port
self.verify_no_images()
# Add an image and verify a 200 OK is returned
image_data = "*" * FIVE_KB
headers = {'Content-Type': 'application/octet-stream',
'X-Image-Meta-Name': 'Image1',
'X-Image-Meta-Is-Public': 'True'}
path = "http://%s:%d/v1/images" % ("0.0.0.0", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'POST', headers=headers,
body=image_data)
self.assertEqual(response.status, 201)
data = json.loads(content)
self.assertEqual(data['image']['checksum'],
hashlib.md5(image_data).hexdigest())
self.assertEqual(data['image']['size'], FIVE_KB)
self.assertEqual(data['image']['name'], "Image1")
self.assertEqual(data['image']['is_public'], True)
image_id = data['image']['id']
# Verify image does not yet show up in cache (we haven't "hit"
# it yet using a GET /images/1 ...
path = "http://%s:%d/v1/cached_images" % ("0.0.0.0", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 200)
data = json.loads(content)
self.assertTrue('cached_images' in data)
self.assertEqual(data['cached_images'], [])
# Grab the image
path = "http://%s:%d/v1/images/%s" % ("0.0.0.0", self.api_port,
image_id)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 200)
# Verify image now in cache
path = "http://%s:%d/v1/cached_images" % ("0.0.0.0", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 200)
data = json.loads(content)
self.assertTrue('cached_images' in data)
cached_images = data['cached_images']
self.assertEqual(1, len(cached_images))
self.assertEqual(image_id, cached_images[0]['image_id'])
self.assertEqual(0, cached_images[0]['hits'])
# Hit the image
path = "http://%s:%d/v1/images/%s" % ("0.0.0.0", self.api_port,
image_id)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 200)
# Verify image hits increased in output of manage GET
path = "http://%s:%d/v1/cached_images" % ("0.0.0.0", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 200)
data = json.loads(content)
self.assertTrue('cached_images' in data)
cached_images = data['cached_images']
self.assertEqual(1, len(cached_images))
self.assertEqual(image_id, cached_images[0]['image_id'])
self.assertEqual(1, cached_images[0]['hits'])
self.stop_servers()
@skip_if_disabled
def test_cache_manage_delete_cached_images(self):
"""
Tests that cached images may be deleted
"""
self.cleanup()
self.start_servers(**self.__dict__.copy())
api_port = self.api_port
registry_port = self.registry_port
self.verify_no_images()
ids = {}
# Add a bunch of images...
for x in xrange(0, 4):
image_data = "*" * FIVE_KB
headers = {'Content-Type': 'application/octet-stream',
'X-Image-Meta-Name': 'Image%s' % x,
'X-Image-Meta-Is-Public': 'True'}
path = "http://%s:%d/v1/images" % ("0.0.0.0", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'POST', headers=headers,
body=image_data)
self.assertEqual(response.status, 201)
data = json.loads(content)
self.assertEqual(data['image']['checksum'],
hashlib.md5(image_data).hexdigest())
self.assertEqual(data['image']['size'], FIVE_KB)
self.assertEqual(data['image']['name'], "Image%s" % x)
self.assertEqual(data['image']['is_public'], True)
ids[x] = data['image']['id']
# Verify no images in cached_images because no image has been hit
# yet using a GET /images/<IMAGE_ID> ...
path = "http://%s:%d/v1/cached_images" % ("0.0.0.0", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 200)
data = json.loads(content)
self.assertTrue('cached_images' in data)
self.assertEqual(data['cached_images'], [])
# Grab the images, essentially caching them...
for x in xrange(0, 4):
path = "http://%s:%d/v1/images/%s" % ("0.0.0.0", self.api_port,
ids[x])
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 200,
"Failed to find image %s" % ids[x])
# Verify images now in cache
path = "http://%s:%d/v1/cached_images" % ("0.0.0.0", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 200)
data = json.loads(content)
self.assertTrue('cached_images' in data)
cached_images = data['cached_images']
self.assertEqual(4, len(cached_images))
for x in xrange(4, 0): # Cached images returned last modified order
self.assertEqual(ids[x], cached_images[x]['image_id'])
self.assertEqual(0, cached_images[x]['hits'])
# Delete third image of the cached images and verify no longer in cache
path = "http://%s:%d/v1/cached_images/%s" % ("0.0.0.0", self.api_port,
ids[2])
http = httplib2.Http()
response, content = http.request(path, 'DELETE')
self.assertEqual(response.status, 200)
path = "http://%s:%d/v1/cached_images" % ("0.0.0.0", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 200)
data = json.loads(content)
self.assertTrue('cached_images' in data)
cached_images = data['cached_images']
self.assertEqual(3, len(cached_images))
self.assertTrue(ids[2] not in [x['image_id'] for x in cached_images])
# Delete all cached images and verify nothing in cache
path = "http://%s:%d/v1/cached_images" % ("0.0.0.0", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'DELETE')
self.assertEqual(response.status, 200)
path = "http://%s:%d/v1/cached_images" % ("0.0.0.0", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(response.status, 200)
data = json.loads(content)
self.assertTrue('cached_images' in data)
cached_images = data['cached_images']
self.assertEqual(0, len(cached_images))
self.stop_servers()
class TestImageCacheXattr(functional.FunctionalTest,
BaseCacheMiddlewareTest):
@ -142,6 +350,44 @@ class TestImageCacheXattr(functional.FunctionalTest,
shutil.rmtree(self.api_server.image_cache_dir)
class TestImageCacheManageXattr(functional.FunctionalTest,
BaseCacheManageMiddlewareTest):
"""
Functional tests that exercise the image cache management
with the Xattr cache driver
"""
def setUp(self):
"""
Test to see if the pre-requisites for the image cache
are working (python-xattr installed and xattr support on the
filesystem)
"""
if getattr(self, 'disabled', False):
return
if not getattr(self, 'inited', False):
try:
import xattr
except ImportError:
self.inited = True
self.disabled = True
self.disabled_message = ("python-xattr not installed.")
return
self.inited = True
self.disabled = False
self.cache_pipeline = "cache cache_manage"
self.image_cache_driver = "xattr"
super(TestImageCacheManageXattr, self).setUp()
def tearDown(self):
if os.path.exists(self.api_server.image_cache_dir):
shutil.rmtree(self.api_server.image_cache_dir)
class TestImageCacheSqlite(functional.FunctionalTest,
BaseCacheMiddlewareTest):
@ -177,3 +423,40 @@ class TestImageCacheSqlite(functional.FunctionalTest,
def tearDown(self):
if os.path.exists(self.api_server.image_cache_dir):
shutil.rmtree(self.api_server.image_cache_dir)
class TestImageCacheManageSqlite(functional.FunctionalTest,
BaseCacheManageMiddlewareTest):
"""
Functional tests that exercise the image cache management using the
SQLite driver
"""
def setUp(self):
"""
Test to see if the pre-requisites for the image cache
are working (python-xattr installed and xattr support on the
filesystem)
"""
if getattr(self, 'disabled', False):
return
if not getattr(self, 'inited', False):
try:
import sqlite3
except ImportError:
self.inited = True
self.disabled = True
self.disabled_message = ("python-sqlite3 not installed.")
return
self.inited = True
self.disabled = False
self.cache_pipeline = "cache cache_manage"
super(TestImageCacheManageSqlite, self).setUp()
def tearDown(self):
if os.path.exists(self.api_server.image_cache_dir):
shutil.rmtree(self.api_server.image_cache_dir)