[uwsgi] Add missing pefetch periodic job
Glance was not supporting uwsgi deployment when we added periodic job to prefetch images into cache. Later in ussuri uwsgi support was added but we missed to implement periodic job to pre-cache the image. This patch add this support for glance + uwsgi. For WSGI, we run the prefetcher with an external lock, which makes sure that multiple API workers will not attempt to cache an image at the same time. In this case, if multiple workers attempt to run at the same time, only one will grab the lock and do the work. When completed, the other worker will grab the lock and either find all the work completed, or complete new work that has been queued since the first one started. Closes-Bug: #1939307 Co-Authored-By: Dan Smith <dms@danplanet.com> Change-Id: I2abd1e60f414fbd68ce84e0b280f8b3e4e791a82
This commit is contained in:
committed by
Dan Smith
parent
929190a6af
commit
9b385f7b67
@@ -108,6 +108,24 @@ class ThreadPoolModel(object):
|
||||
fn, args, kwargs))
|
||||
return self.pool.submit(fn, *args, **kwargs)
|
||||
|
||||
def map(self, fn, iterable):
|
||||
"""Map a function to each value in an iterable.
|
||||
|
||||
This spawns a thread for each item in the provided iterable,
|
||||
generating results in the same order. Each item will spawn a
|
||||
thread, and each may run in parallel up to the limit of the
|
||||
pool.
|
||||
|
||||
:param fn: A function to work on each item
|
||||
:param iterable: A sequence of items to process
|
||||
:returns: A generator of results in the same order
|
||||
"""
|
||||
threads = []
|
||||
for i in iterable:
|
||||
threads.append(self.spawn(fn, i))
|
||||
for future in threads:
|
||||
yield future.result()
|
||||
|
||||
|
||||
class EventletThreadPoolModel(ThreadPoolModel):
|
||||
"""A ThreadPoolModel suitable for use with evenlet/greenthreads."""
|
||||
|
||||
@@ -103,6 +103,28 @@ def run_staging_cleanup():
|
||||
cleanup_thread.start()
|
||||
|
||||
|
||||
def cache_images(cache_prefetcher):
|
||||
# After every 'cache_prefetcher_interval' this call will run and fetch
|
||||
# all queued images into cache if there are any
|
||||
cache_thread = threading.Timer(CONF.cache_prefetcher_interval,
|
||||
cache_images, (cache_prefetcher,))
|
||||
cache_thread.daemon = True
|
||||
cache_thread.start()
|
||||
cache_prefetcher.run()
|
||||
|
||||
|
||||
def run_cache_prefetcher():
|
||||
if not CONF.paste_deploy.flavor == 'keystone+cachemanagement':
|
||||
LOG.debug('Cache not enabled, skipping prefetching images in cache!!!')
|
||||
return
|
||||
|
||||
# NOTE(abhishekk): Importing the prefetcher just in time to avoid
|
||||
# import loop during initialization
|
||||
from glance.image_cache import prefetcher # noqa
|
||||
cache_prefetcher = prefetcher.Prefetcher()
|
||||
cache_images(cache_prefetcher)
|
||||
|
||||
|
||||
def init_app():
|
||||
config.set_config_defaults()
|
||||
config_files = _get_config_files()
|
||||
@@ -135,6 +157,7 @@ def init_app():
|
||||
glance_store.create_stores(CONF)
|
||||
glance_store.verify_default_store()
|
||||
|
||||
run_cache_prefetcher()
|
||||
run_staging_cleanup()
|
||||
|
||||
_setup_os_profiler()
|
||||
|
||||
@@ -17,11 +17,12 @@
|
||||
Prefetches images into the Image Cache
|
||||
"""
|
||||
|
||||
import eventlet
|
||||
import glance_store
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from glance.api import common as api_common
|
||||
from glance.common import exception
|
||||
from glance import context
|
||||
from glance.i18n import _LI, _LW
|
||||
@@ -40,8 +41,8 @@ class Prefetcher(base.CacheApp):
|
||||
self.gateway = glance.gateway.Gateway()
|
||||
|
||||
def fetch_image_into_cache(self, image_id):
|
||||
ctx = context.RequestContext(is_admin=True, show_deleted=True)
|
||||
|
||||
ctx = context.RequestContext(is_admin=True, show_deleted=True,
|
||||
roles=['admin'])
|
||||
try:
|
||||
image_repo = self.gateway.get_repo(ctx, authorization_layer=False)
|
||||
image = image_repo.get(image_id)
|
||||
@@ -70,6 +71,7 @@ class Prefetcher(base.CacheApp):
|
||||
list(cache_tee_iter)
|
||||
return True
|
||||
|
||||
@lockutils.lock('glance-cache', external=True)
|
||||
def run(self):
|
||||
images = self.cache.get_queued_images()
|
||||
if not images:
|
||||
@@ -79,8 +81,8 @@ class Prefetcher(base.CacheApp):
|
||||
num_images = len(images)
|
||||
LOG.debug("Found %d images to prefetch", num_images)
|
||||
|
||||
pool = eventlet.GreenPool(num_images)
|
||||
results = pool.imap(self.fetch_image_into_cache, images)
|
||||
pool = api_common.get_thread_pool('prefetcher', size=num_images)
|
||||
results = pool.map(self.fetch_image_into_cache, images)
|
||||
successes = sum([1 for r in results if r is True])
|
||||
if successes != num_images:
|
||||
LOG.warn(_LW("Failed to successfully cache all "
|
||||
|
||||
@@ -245,6 +245,11 @@ class TestSystemThreadPoolModel(test_utils.BaseTestCase):
|
||||
# This isn't used anywhere, but make sure we get the future
|
||||
self.assertEqual(pool.submit.return_value, result)
|
||||
|
||||
def test_model_map(self):
|
||||
model = glance.async_.EventletThreadPoolModel()
|
||||
results = model.map(lambda s: s.upper(), ['a', 'b', 'c'])
|
||||
self.assertEqual(['A', 'B', 'C'], list(results))
|
||||
|
||||
@mock.patch('glance.async_.ThreadPoolModel.get_threadpool_executor_class')
|
||||
def test_base_model_init_with_size(self, mock_gte):
|
||||
mock_gte.return_value.__name__ = 'TestModel'
|
||||
|
||||
@@ -112,3 +112,29 @@ class TestWsgiAppInit(test_utils.BaseTestCase):
|
||||
target=mock_cleaner().clean_orphaned_staging_residue,
|
||||
daemon=True)
|
||||
mock_Thread.return_value.start.assert_called_once_with()
|
||||
|
||||
@mock.patch('glance.async_._THREADPOOL_MODEL', new=None)
|
||||
@mock.patch('glance.common.config.load_paste_app')
|
||||
@mock.patch('glance.common.wsgi_app._get_config_files')
|
||||
@mock.patch('threading.Timer')
|
||||
@mock.patch('glance.image_cache.prefetcher.Prefetcher')
|
||||
def test_run_cache_prefetcher(self, mock_prefetcher,
|
||||
mock_Timer, mock_conf,
|
||||
mock_load):
|
||||
self.config(cache_prefetcher_interval=10)
|
||||
self.config(flavor='keystone+cachemanagement', group='paste_deploy')
|
||||
mock_conf.return_value = []
|
||||
wsgi_app.init_app()
|
||||
mock_Timer.assert_called_once_with(10, mock.ANY, (mock_prefetcher(),))
|
||||
mock_Timer.return_value.start.assert_called_once_with()
|
||||
|
||||
@mock.patch('glance.async_._THREADPOOL_MODEL', new=None)
|
||||
@mock.patch('glance.common.config.load_paste_app')
|
||||
@mock.patch('glance.common.wsgi_app._get_config_files')
|
||||
@mock.patch('threading.Timer')
|
||||
@mock.patch('glance.image_cache.prefetcher.Prefetcher')
|
||||
def test_run_cache_prefetcher_middleware_disabled(
|
||||
self, mock_prefetcher, mock_Timer, mock_conf, mock_load):
|
||||
mock_conf.return_value = []
|
||||
wsgi_app.init_app()
|
||||
mock_Timer.assert_not_called()
|
||||
|
||||
@@ -16,25 +16,33 @@
|
||||
from contextlib import contextmanager
|
||||
import datetime
|
||||
import os
|
||||
import tempfile
|
||||
import time
|
||||
from unittest import mock
|
||||
|
||||
import fixtures
|
||||
import glance_store as store
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import secretutils
|
||||
from oslo_utils import units
|
||||
import six
|
||||
# NOTE(jokke): simplified transition to py3, behaves like py2 xrange
|
||||
from six.moves import range
|
||||
|
||||
from glance import async_
|
||||
from glance.common import exception
|
||||
from glance import context
|
||||
from glance import gateway as glance_gateway
|
||||
from glance import image_cache
|
||||
from glance.image_cache import prefetcher
|
||||
from glance.tests.unit import utils as unit_test_utils
|
||||
from glance.tests import utils as test_utils
|
||||
from glance.tests.utils import skip_if_disabled
|
||||
from glance.tests.utils import xattr_writes_supported
|
||||
|
||||
FIXTURE_LENGTH = 1024
|
||||
FIXTURE_DATA = b'*' * FIXTURE_LENGTH
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class ImageCacheTestCase(object):
|
||||
@@ -512,6 +520,61 @@ class TestImageCacheSqlite(test_utils.BaseTestCase,
|
||||
image_cache_max_size=5 * units.Ki)
|
||||
self.cache = image_cache.ImageCache()
|
||||
|
||||
@mock.patch('glance.db.get_api')
|
||||
def _test_prefetcher(self, mock_get_db):
|
||||
self.config(enabled_backends={'cheap': 'file'})
|
||||
store.register_store_opts(CONF)
|
||||
self.config(filesystem_store_datadir='/tmp', group='cheap')
|
||||
store.create_multi_stores(CONF)
|
||||
|
||||
tempf = tempfile.NamedTemporaryFile()
|
||||
tempf.write(b'foo')
|
||||
|
||||
db = unit_test_utils.FakeDB(initialize=False)
|
||||
mock_get_db.return_value = db
|
||||
|
||||
ctx = context.RequestContext(is_admin=True, roles=['admin'])
|
||||
gateway = glance_gateway.Gateway()
|
||||
image_factory = gateway.get_image_factory(ctx,
|
||||
authorization_layer=False)
|
||||
image_repo = gateway.get_repo(ctx, authorization_layer=False)
|
||||
fetcher = prefetcher.Prefetcher()
|
||||
|
||||
# Create an image with no values set and queue it
|
||||
image = image_factory.new_image()
|
||||
image_repo.add(image)
|
||||
fetcher.cache.queue_image(image.image_id)
|
||||
|
||||
# Image is not active, so it should fail to cache, but remain queued
|
||||
self.assertFalse(fetcher.run())
|
||||
self.assertFalse(fetcher.cache.is_cached(image.image_id))
|
||||
self.assertTrue(fetcher.cache.is_queued(image.image_id))
|
||||
|
||||
# Set the disk/container format and give it a location
|
||||
image.disk_format = 'raw'
|
||||
image.container_format = 'bare'
|
||||
image.status = 'active'
|
||||
loc = {'url': 'file://%s' % tempf.name, 'metadata': {'store': 'cheap'}}
|
||||
with mock.patch('glance.location._check_image_location'):
|
||||
# FIXME(danms): Why do I have to do this?
|
||||
image.locations = [loc]
|
||||
image_repo.save(image)
|
||||
|
||||
# Image is now active and has a location, so it should cache
|
||||
self.assertTrue(fetcher.run())
|
||||
self.assertTrue(fetcher.cache.is_cached(image.image_id))
|
||||
self.assertFalse(fetcher.cache.is_queued(image.image_id))
|
||||
|
||||
@mock.patch('glance.async_._THREADPOOL_MODEL', new=None)
|
||||
def test_prefetcher_greenthread(self):
|
||||
async_.set_threadpool_model('eventlet')
|
||||
self._test_prefetcher()
|
||||
|
||||
@mock.patch('glance.async_._THREADPOOL_MODEL', new=None)
|
||||
def test_prefetcher_native(self):
|
||||
async_.set_threadpool_model('native')
|
||||
self._test_prefetcher()
|
||||
|
||||
|
||||
class TestImageCacheNoDep(test_utils.BaseTestCase):
|
||||
|
||||
|
||||
Reference in New Issue
Block a user