Add periodic job to prefetch images into cache
Added new periodic job which will run as per interval set using 'cache_prefetcher_interval' configuration option and fetch images which are queued for caching in cache directory. DocImpact Change-Id: If2875f7a215aca10a6ed2febc89b02219f90a10d
This commit is contained in:
parent
e475581c72
commit
73fefddd96
|
@ -75,6 +75,8 @@ correctly.
|
|||
- ``filesystem_store_datadirs`` This is used to point to multiple
|
||||
filesystem stores.
|
||||
- ``registry_host`` The URL to the Glance registry.
|
||||
- ``cache_prefetcher_interval`` The interval in seconds to run periodic
|
||||
job 'cache_images'.
|
||||
|
||||
Controlling the Growth of the Image Cache
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
@ -107,18 +109,20 @@ The recommended practice is to use ``cron`` to fire ``glance-cache-cleaner``
|
|||
at a semi-regular interval.
|
||||
|
||||
Prefetching Images into the Image Cache
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
---------------------------------------
|
||||
|
||||
Some installations have base (sometimes called "golden") images that are
|
||||
very commonly used to boot virtual machines. When spinning up a new API
|
||||
server, administrators may wish to prefetch these image files into the
|
||||
local image cache to ensure that reads of those popular image files come
|
||||
from a local cache.
|
||||
In Train, Glance API has added a new periodic job ``cache_images`` which will
|
||||
run after every predefined time interval to fetch the queued images into cache.
|
||||
The default time interval for the ``cache_images`` periodic job is 300
|
||||
seconds. Admin/Operator can configure this interval in glance-api.conf file or
|
||||
glance-cache.conf file using ``cache_prefetcher_interval`` configuration
|
||||
option. The ``cache_images`` periodic job will only run if cache middleware
|
||||
is enabled in your cloud.
|
||||
|
||||
To queue an image for prefetching, you can use one of the following methods:
|
||||
|
||||
* If the ``cache_manage`` middleware is enabled in the application pipeline,
|
||||
you may call ``PUT /queued-images/<IMAGE_ID>`` to queue the image with
|
||||
you may call ``PUT /queued/<IMAGE_ID>`` to queue the image with
|
||||
identifier ``<IMAGE_ID>``
|
||||
|
||||
* Alternately, you can use the ``glance-cache-manage`` program to queue the
|
||||
|
@ -129,9 +133,9 @@ To queue an image for prefetching, you can use one of the following methods:
|
|||
|
||||
This will queue the image with identifier ``<IMAGE_ID>`` for prefetching
|
||||
|
||||
Once you have queued the images you wish to prefetch, call the
|
||||
``glance-cache-prefetcher`` executable, which will prefetch all queued images
|
||||
concurrently, logging the results of the fetch for each image.
|
||||
Once you have queued the images you wish to prefetch, the ``cache_images``
|
||||
periodic job will prefetch all queued images concurrently, logging the
|
||||
results of the fetch for each image.
|
||||
|
||||
Finding Which Images are in the Image Cache
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
|
|
@ -97,7 +97,15 @@ def main():
|
|||
host=CONF.bind_host
|
||||
)
|
||||
|
||||
server = wsgi.Server(initialize_glance_store=True)
|
||||
# NOTE(abhishekk): Added initialize_prefetcher KW argument to Server
|
||||
# object so that prefetcher object should only be initialized in case
|
||||
# of API service and ignored in case of registry. Once registry is
|
||||
# removed this parameter should be removed as well.
|
||||
initialize_prefetcher = False
|
||||
if CONF.paste_deploy.flavor == 'keystone+cachemanagement':
|
||||
initialize_prefetcher = True
|
||||
server = wsgi.Server(initialize_glance_store=True,
|
||||
initialize_prefetcher=initialize_prefetcher)
|
||||
server.start(config.load_paste_app('glance-api'), default_port=9292)
|
||||
server.wait()
|
||||
except Exception as e:
|
||||
|
|
|
@ -30,6 +30,7 @@ import signal
|
|||
import struct
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
from eventlet.green import socket
|
||||
|
@ -56,6 +57,7 @@ from glance.common import exception
|
|||
from glance.common import utils
|
||||
from glance import i18n
|
||||
from glance.i18n import _, _LE, _LI, _LW
|
||||
from glance.image_cache import prefetcher
|
||||
|
||||
|
||||
bind_opts = [
|
||||
|
@ -330,6 +332,23 @@ cli_opts = [
|
|||
'used for inter-process communication.'),
|
||||
]
|
||||
|
||||
cache_opts = [
|
||||
cfg.FloatOpt('cache_prefetcher_interval',
|
||||
default=300,
|
||||
help=_("""
|
||||
The interval in seconds to run periodic job cache_images.
|
||||
|
||||
The cache_images method will fetch all images which are in queued state
|
||||
for caching in cache directory. The default value is 300.
|
||||
|
||||
Possible values:
|
||||
* Positive integer
|
||||
|
||||
Related options:
|
||||
* None
|
||||
"""))
|
||||
]
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
@ -338,6 +357,7 @@ CONF.register_opts(socket_opts)
|
|||
CONF.register_opts(eventlet_opts)
|
||||
CONF.register_opts(wsgi_opts)
|
||||
CONF.register_opts(store_opts)
|
||||
CONF.register_opts(cache_opts)
|
||||
profiler_opts.set_defaults(CONF)
|
||||
|
||||
ASYNC_EVENTLET_THREAD_POOL_LIST = []
|
||||
|
@ -502,7 +522,8 @@ class BaseServer(object):
|
|||
This class requires initialize_glance_store set to True if
|
||||
glance store needs to be initialized.
|
||||
"""
|
||||
def __init__(self, threads=1000, initialize_glance_store=False):
|
||||
def __init__(self, threads=1000, initialize_glance_store=False,
|
||||
initialize_prefetcher=False):
|
||||
os.umask(0o27) # ensure files are created with the correct privileges
|
||||
self._logger = logging.getLogger("eventlet.wsgi.server")
|
||||
self.threads = threads
|
||||
|
@ -512,6 +533,18 @@ class BaseServer(object):
|
|||
# NOTE(abhishek): Allows us to only re-initialize glance_store when
|
||||
# the API's configuration reloads.
|
||||
self.initialize_glance_store = initialize_glance_store
|
||||
self.initialize_prefetcher = initialize_prefetcher
|
||||
if self.initialize_prefetcher:
|
||||
self.prefetcher = prefetcher.Prefetcher()
|
||||
|
||||
def cache_images(self):
|
||||
# After every 'cache_prefetcher_interval' this call will run and fetch
|
||||
# all queued images into cache if there are any
|
||||
cache_thread = threading.Timer(CONF.cache_prefetcher_interval,
|
||||
self.cache_images)
|
||||
cache_thread.daemon = True
|
||||
cache_thread.start()
|
||||
self.prefetcher.run()
|
||||
|
||||
@staticmethod
|
||||
def set_signal_handler(signal_name, handler):
|
||||
|
@ -553,6 +586,8 @@ class BaseServer(object):
|
|||
self.default_port = default_port
|
||||
self.configure()
|
||||
self.start_wsgi()
|
||||
if self.initialize_prefetcher:
|
||||
self.cache_images()
|
||||
|
||||
def start_wsgi(self):
|
||||
workers = get_num_workers()
|
||||
|
|
|
@ -19,51 +19,58 @@ Prefetches images into the Image Cache
|
|||
|
||||
import eventlet
|
||||
import glance_store
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from glance.common import exception
|
||||
from glance import context
|
||||
from glance.i18n import _LI, _LW
|
||||
from glance.image_cache import base
|
||||
import glance.registry.client.v1.api as registry
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Prefetcher(base.CacheApp):
|
||||
|
||||
def __init__(self):
|
||||
# NOTE(abhishekk): Importing the glance.gateway just in time to avoid
|
||||
# import loop during initialization
|
||||
import glance.gateway # noqa
|
||||
super(Prefetcher, self).__init__()
|
||||
registry.configure_registry_client()
|
||||
registry.configure_registry_admin_creds()
|
||||
self.gateway = glance.gateway.Gateway()
|
||||
|
||||
def fetch_image_into_cache(self, image_id):
|
||||
ctx = context.RequestContext(is_admin=True, show_deleted=True)
|
||||
|
||||
try:
|
||||
image_meta = registry.get_image_metadata(ctx, image_id)
|
||||
if image_meta['status'] != 'active':
|
||||
LOG.warn(_LW("Image '%s' is not active. Not caching.") %
|
||||
image_id)
|
||||
return False
|
||||
|
||||
image_repo = self.gateway.get_repo(ctx)
|
||||
image = image_repo.get(image_id)
|
||||
except exception.NotFound:
|
||||
LOG.warn(_LW("No metadata found for image '%s'") % image_id)
|
||||
LOG.warn(_LW("Image '%s' not found") % image_id)
|
||||
return False
|
||||
|
||||
location = image_meta['location']
|
||||
image_data, image_size = glance_store.get_from_backend(location,
|
||||
context=ctx)
|
||||
LOG.debug("Caching image '%s'", image_id)
|
||||
cache_tee_iter = self.cache.cache_tee_iter(image_id, image_data,
|
||||
image_meta['checksum'])
|
||||
# Image is tee'd into cache and checksum verified
|
||||
# as we iterate
|
||||
list(cache_tee_iter)
|
||||
return True
|
||||
if image.status != 'active':
|
||||
LOG.warn(_LW("Image '%s' is not active. Not caching.") % image_id)
|
||||
return False
|
||||
|
||||
for loc in image.locations:
|
||||
if CONF.enabled_backends:
|
||||
image_data, image_size = glance_store.get(loc['url'],
|
||||
None,
|
||||
context=ctx)
|
||||
else:
|
||||
image_data, image_size = glance_store.get_from_backend(
|
||||
loc['url'], context=ctx)
|
||||
|
||||
LOG.debug("Caching image '%s'", image_id)
|
||||
cache_tee_iter = self.cache.cache_tee_iter(image_id, image_data,
|
||||
image.checksum)
|
||||
# Image is tee'd into cache and checksum verified
|
||||
# as we iterate
|
||||
list(cache_tee_iter)
|
||||
return True
|
||||
|
||||
def run(self):
|
||||
|
||||
images = self.cache.get_queued_images()
|
||||
if not images:
|
||||
LOG.debug("Nothing to prefetch.")
|
||||
|
|
|
@ -759,9 +759,14 @@ class RegistryServer(Server):
|
|||
self.policy_file = policy_file
|
||||
self.policy_default_rule = 'default'
|
||||
self.disable_path = None
|
||||
self.image_cache_dir = os.path.join(self.test_dir,
|
||||
'cache')
|
||||
self.image_cache_driver = 'sqlite'
|
||||
|
||||
self.conf_base = """[DEFAULT]
|
||||
debug = %(debug)s
|
||||
image_cache_dir = %(image_cache_dir)s
|
||||
image_cache_driver = %(image_cache_driver)s
|
||||
bind_host = %(bind_host)s
|
||||
bind_port = %(bind_port)s
|
||||
log_file = %(log_file)s
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
# under the License.
|
||||
|
||||
"""Functional test asserting strongly typed exceptions from glance client"""
|
||||
import os
|
||||
|
||||
import eventlet.patcher
|
||||
import httplib2
|
||||
|
@ -70,6 +71,10 @@ class TestClientExceptions(functional.FunctionalTest):
|
|||
def setUp(self):
|
||||
super(TestClientExceptions, self).setUp()
|
||||
self.port = utils.get_unused_port()
|
||||
self.image_cache_dir = os.path.join(self.test_dir,
|
||||
'cache')
|
||||
self.config(image_cache_dir=self.image_cache_dir)
|
||||
self.config(image_cache_driver='sqlite')
|
||||
server = wsgi.Server()
|
||||
self.config(bind_host='127.0.0.1')
|
||||
self.config(workers=0)
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# under the License.
|
||||
|
||||
"""Functional test cases testing glance client redirect-following."""
|
||||
import os
|
||||
|
||||
import eventlet.patcher
|
||||
from six.moves import http_client as http
|
||||
|
@ -85,6 +86,10 @@ class TestClientRedirects(functional.FunctionalTest):
|
|||
super(TestClientRedirects, self).setUp()
|
||||
self.port_one = utils.get_unused_port()
|
||||
self.port_two = utils.get_unused_port()
|
||||
self.image_cache_dir = os.path.join(self.test_dir,
|
||||
'cache')
|
||||
self.config(image_cache_dir=self.image_cache_dir)
|
||||
self.config(image_cache_driver='sqlite')
|
||||
server_one = wsgi.Server()
|
||||
server_two = wsgi.Server()
|
||||
self.config(bind_host='127.0.0.1')
|
||||
|
|
|
@ -19,19 +19,21 @@ import os
|
|||
import socket
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
import testtools
|
||||
import fixtures
|
||||
|
||||
from glance.common import wsgi
|
||||
|
||||
CONF = cfg.CONF
|
||||
from glance.tests import functional
|
||||
|
||||
|
||||
class TestWSGIServer(testtools.TestCase):
|
||||
class TestWSGIServer(functional.FunctionalTest):
|
||||
"""WSGI server tests."""
|
||||
def test_client_socket_timeout(self):
|
||||
CONF.set_default("workers", 0)
|
||||
CONF.set_default("client_socket_timeout", 1)
|
||||
test_dir = self.useFixture(fixtures.TempDir()).path
|
||||
image_cache_dir = os.path.join(test_dir, 'cache')
|
||||
self.config(workers=0)
|
||||
self.config(client_socket_timeout=1)
|
||||
self.config(image_cache_dir=image_cache_dir)
|
||||
self.config(image_cache_driver="sqlite")
|
||||
"""Verify connections are timed out as per 'client_socket_timeout'"""
|
||||
greetings = b'Hello, World!!!'
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import glance.common.config
|
|||
from glance.common import exception as exc
|
||||
import glance.common.wsgi
|
||||
import glance.image_cache.cleaner
|
||||
from glance.image_cache import prefetcher
|
||||
import glance.image_cache.pruner
|
||||
from glance.tests import utils as test_utils
|
||||
|
||||
|
@ -64,11 +65,13 @@ class TestGlanceApiCmd(test_utils.BaseTestCase):
|
|||
sys.argv = self.__argv_backup
|
||||
super(TestGlanceApiCmd, self).tearDown()
|
||||
|
||||
def test_supported_default_store(self):
|
||||
@mock.patch.object(prefetcher, 'Prefetcher')
|
||||
def test_supported_default_store(self, mock_prefetcher):
|
||||
self.config(group='glance_store', default_store='file')
|
||||
glance.cmd.api.main()
|
||||
|
||||
def test_worker_creation_failure(self):
|
||||
@mock.patch.object(prefetcher, 'Prefetcher')
|
||||
def test_worker_creation_failure(self, mock_prefetcher):
|
||||
failure = exc.WorkerCreationFailure(reason='test')
|
||||
self.mock_object(glance.common.wsgi.Server, 'start',
|
||||
self._raise(failure))
|
||||
|
|
|
@ -36,6 +36,7 @@ from glance.common import exception
|
|||
from glance.common import utils
|
||||
from glance.common import wsgi
|
||||
from glance import i18n
|
||||
from glance.image_cache import prefetcher
|
||||
from glance.tests import utils as test_utils
|
||||
|
||||
|
||||
|
@ -561,13 +562,15 @@ class JSONRequestDeserializerTest(test_utils.BaseTestCase):
|
|||
|
||||
|
||||
class ServerTest(test_utils.BaseTestCase):
|
||||
def test_create_pool(self):
|
||||
@mock.patch.object(prefetcher, 'Prefetcher')
|
||||
def test_create_pool(self, mock_prefetcher):
|
||||
"""Ensure the wsgi thread pool is an eventlet.greenpool.GreenPool."""
|
||||
actual = wsgi.Server(threads=1).create_pool()
|
||||
self.assertIsInstance(actual, eventlet.greenpool.GreenPool)
|
||||
|
||||
@mock.patch.object(prefetcher, 'Prefetcher')
|
||||
@mock.patch.object(wsgi.Server, 'configure_socket')
|
||||
def test_http_keepalive(self, mock_configure_socket):
|
||||
def test_http_keepalive(self, mock_configure_socket, mock_prefetcher):
|
||||
self.config(http_keepalive=False)
|
||||
self.config(workers=0)
|
||||
|
||||
|
@ -588,7 +591,8 @@ class ServerTest(test_utils.BaseTestCase):
|
|||
keepalive=False,
|
||||
socket_timeout=900)
|
||||
|
||||
def test_number_of_workers_posix(self):
|
||||
@mock.patch.object(prefetcher, 'Prefetcher')
|
||||
def test_number_of_workers_posix(self, mock_prefetcher):
|
||||
"""Ensure the number of workers matches num cpus limited to 8."""
|
||||
if os.name == 'nt':
|
||||
raise self.skipException("Unsupported platform.")
|
||||
|
@ -696,7 +700,8 @@ class GetSocketTestCase(test_utils.BaseTestCase):
|
|||
wsgi.CONF.ca_file = '/etc/ssl/ca_cert'
|
||||
wsgi.CONF.tcp_keepidle = 600
|
||||
|
||||
def test_correct_configure_socket(self):
|
||||
@mock.patch.object(prefetcher, 'Prefetcher')
|
||||
def test_correct_configure_socket(self, mock_prefetcher):
|
||||
mock_socket = mock.Mock()
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'glance.common.wsgi.ssl.wrap_socket',
|
||||
|
|
Loading…
Reference in New Issue