Make glance-api able to do async tasks in WSGI mode

This teaches glance-api how to do async threading things when it is
running in pure-WSGI mode. In order to do that, a refactoring of things
that currently depend on eventlet is required.

It adds a [wsgi]/task_pool_threads configuration knob, which is used
in the case of pure-WSGI and native threads to constrain the number
of threads in that pool (and thus the task parallelism). This will
allow tuning by the operator, but also lets us default that to just
a single thread in the backport of these fixes so that we can avoid
introducing a new larger footprint in the backport unexpectedly.

Partial-Bug: #1888713
Depends-On: https://review.opendev.org/#/c/742047/
Change-Id: Ie15028b75fb8518ec2b0c0c0386d21782166f759
This commit is contained in:
Dan Smith 2020-07-20 20:49:34 -07:00
parent 94193c0453
commit 16a5431c66
18 changed files with 392 additions and 29 deletions

View File

@ -228,6 +228,21 @@
inject: |
"glance_devstack_test":"doyouseeme?"
- job:
name: tempest-integrated-storage-wsgi-import
parent: tempest-integrated-storage
description: |
The regular tempest-integrated-storage job but with glance in wsgi mode
vars:
devstack_localrc:
GLANCE_STANDALONE: False
GLANCE_USE_IMPORT_WORKFLOW: True
devstack_local_conf:
post-config:
$GLANCE_API_CONF:
DEFAULT:
enabled_import_methods: "[\"copy-image\", \"glance-direct\"]"
- project:
templates:
- check-requirements
@ -261,6 +276,8 @@
irrelevant-files: *tempest-irrelevant-files
- tempest-integrated-storage-import-workflow:
irrelevant-files: *tempest-irrelevant-files
- tempest-integrated-storage-wsgi-import:
irrelevant-files: *tempest-irrelevant-files
- grenade:
irrelevant-files: *tempest-irrelevant-files
- tempest-ipv6-only:

View File

@ -21,8 +21,8 @@ from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import units
import glance.async_
from glance.common import exception
from glance.common import wsgi
from glance.i18n import _, _LE, _LW
LOG = logging.getLogger(__name__)
@ -197,20 +197,33 @@ def memoize(lock_name):
return memoizer_wrapper
def get_thread_pool(lock_name, size=1024):
"""Initializes eventlet thread pool.
# NOTE(danms): This is the default pool size that will be used for
# the get_thread_pool() cache wrapper below. This is a global here
# because it needs to be overridden for the pure-wsgi mode in
# wsgi_app.py where native threads are used.
DEFAULT_POOL_SIZE = 1024
def get_thread_pool(lock_name, size=None):
"""Initializes thread pool.
If thread pool is present in cache, then returns it from cache
else create new pool, stores it in cache and return newly created
pool.
@param lock_name: Name of the lock.
@param size: Size of eventlet pool.
@param size: Size of pool.
@return: eventlet pool
@return: ThreadPoolModel
"""
if size is None:
size = DEFAULT_POOL_SIZE
@memoize(lock_name)
def _get_thread_pool():
return wsgi.get_asynchronous_eventlet_pool(size=size)
threadpool_cls = glance.async_.get_threadpool_model()
LOG.debug('Initializing named threadpool %r', lock_name)
return threadpool_cls(size)
return _get_thread_pool

View File

@ -216,8 +216,8 @@ class ImagesController(object):
task_input=task_input)
task_repo.add(import_task)
task_executor = executor_factory.new_task_executor(req.context)
pool = common.get_thread_pool("tasks_eventlet_pool")
pool.spawn_n(import_task.run, task_executor)
pool = common.get_thread_pool("tasks_pool")
pool.spawn(import_task.run, task_executor)
except exception.Forbidden as e:
LOG.debug("User not permitted to create image import task.")
raise webob.exc.HTTPForbidden(explanation=e.msg)

View File

@ -78,8 +78,8 @@ class TasksController(object):
task_input=task['input'])
task_repo.add(new_task)
task_executor = executor_factory.new_task_executor(req.context)
pool = common.get_thread_pool("tasks_eventlet_pool")
pool.spawn_n(new_task.run, task_executor)
pool = common.get_thread_pool("tasks_pool")
pool.spawn(new_task.run, task_executor)
except exception.Forbidden as e:
msg = (_LW("Forbidden to create task. Reason: %(reason)s")
% {'reason': encodeutils.exception_to_unicode(e)})

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import futurist
from oslo_log import log as logging
from glance.i18n import _LE
@ -75,3 +76,109 @@ class TaskExecutor(object):
LOG.error(msg)
task.fail(_LE("Internal error occurred while trying to process task."))
self.task_repo.save(task)
class ThreadPoolModel(object):
"""Base class for an abstract ThreadPool.
Do not instantiate this directly, use one of the concrete
implementations.
"""
DEFAULTSIZE = 1
@staticmethod
def get_threadpool_executor_class():
"""Returns a futurist.ThreadPoolExecutor class."""
pass
def __init__(self, size=None):
if size is None:
size = self.DEFAULTSIZE
threadpool_cls = self.get_threadpool_executor_class()
LOG.debug('Creating threadpool model %r with size %i',
threadpool_cls.__name__, size)
self.pool = threadpool_cls(size)
def spawn(self, fn, *args, **kwargs):
"""Spawn a function with args using the thread pool."""
LOG.debug('Spawning with %s: %s(%s, %s)' % (
self.get_threadpool_executor_class().__name__,
fn, args, kwargs))
return self.pool.submit(fn, *args, **kwargs)
class EventletThreadPoolModel(ThreadPoolModel):
"""A ThreadPoolModel suitable for use with evenlet/greenthreads."""
DEFAULTSIZE = 1024
@staticmethod
def get_threadpool_executor_class():
return futurist.GreenThreadPoolExecutor
class NativeThreadPoolModel(ThreadPoolModel):
"""A ThreadPoolModel suitable for use with native threads."""
DEFAULTSIZE = 16
@staticmethod
def get_threadpool_executor_class():
return futurist.ThreadPoolExecutor
_THREADPOOL_MODEL = None
def set_threadpool_model(thread_type):
"""Set the system-wide threadpool model.
This sets the type of ThreadPoolModel to use globally in the process.
It should be called very early in init, and only once.
:param thread_type: A string indicating the threading type in use,
either "eventlet" or "native"
:raises: RuntimeError if the model is already set or some thread_type
other than one of the supported ones is provided.
"""
global _THREADPOOL_MODEL
if thread_type == 'native':
model = NativeThreadPoolModel
elif thread_type == 'eventlet':
model = EventletThreadPoolModel
else:
raise RuntimeError(
('Invalid thread type %r '
'(must be "native" or "eventlet")') % (thread_type))
if _THREADPOOL_MODEL is model:
# Re-setting the same model is fine...
return
if _THREADPOOL_MODEL is not None:
# ...changing it is not.
raise RuntimeError('Thread model is already set')
LOG.info('Threadpool model set to %r', model.__name__)
_THREADPOOL_MODEL = model
def get_threadpool_model():
"""Returns the system-wide threadpool model class.
This must be called after set_threadpool_model() whenever
some code needs to know what the threadpool implementation is.
This may only be called after set_threadpool_model() has been
called to set the desired threading mode. If it is called before
the model is set, it will raise AssertionError. This would likely
be the case if this got run in a test before the model was
initialized, or if glance modules that use threading were imported
and run from some other code without setting the model first.
:raises: AssertionError if the model has not yet been set.
"""
global _THREADPOOL_MODEL
assert _THREADPOOL_MODEL
return _THREADPOOL_MODEL

View File

@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import futurist
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import encodeutils
@ -104,13 +103,8 @@ class TaskExecutor(glance.async_.TaskExecutor):
return None
else:
max_workers = CONF.taskflow_executor.max_workers
try:
return futurist.GreenThreadPoolExecutor(
max_workers=max_workers)
except RuntimeError:
# NOTE(harlowja): I guess eventlet isn't being made
# useable, well just use native threads then (or try to).
return futurist.ThreadPoolExecutor(max_workers=max_workers)
threadpool_cls = glance.async_.get_threadpool_model()
return threadpool_cls(max_workers).pool
def _get_flow(self, task):
try:

View File

@ -62,6 +62,7 @@ from oslo_config import cfg
from oslo_log import log as logging
import osprofiler.initializer
import glance.async_
from glance.common import config
from glance.common import exception
from glance.common import wsgi
@ -107,6 +108,9 @@ def main():
host=CONF.bind_host
)
# NOTE(danms): Configure system-wide threading model to use eventlet
glance.async_.set_threadpool_model('eventlet')
# NOTE(abhishekk): Added initialize_prefetcher KW argument to Server
# object so that prefetcher object should only be initialized in case
# of API service and ignored in case of registry. Once registry is

View File

@ -566,11 +566,27 @@ Related options:
* [DEFAULT]/node_staging_uri""")),
]
wsgi_opts = [
cfg.IntOpt('task_pool_threads',
default=16,
min=1,
help=_("""
The number of threads (per worker process) in the pool for processing
asynchronous tasks. This controls how many asynchronous tasks (i.e. for
image interoperable import) each worker can run at a time. If this is
too large, you *may* have increased memory footprint per worker and/or you
may overwhelm other system resources such as disk or outbound network
bandwidth. If this is too small, image import requests will have to wait
until a thread becomes available to begin processing.""")),
]
CONF = cfg.CONF
CONF.register_opts(paste_deploy_opts, group='paste_deploy')
CONF.register_opts(image_format_opts, group='image_format')
CONF.register_opts(task_opts, group='task')
CONF.register_opts(common_opts)
CONF.register_opts(wsgi_opts, group='wsgi')
policy.Enforcer(CONF)

View File

@ -17,6 +17,8 @@ from oslo_config import cfg
from oslo_log import log as logging
import osprofiler.initializer
from glance.api import common
import glance.async_
from glance.common import config
from glance.common import store_utils
from glance.i18n import _
@ -71,6 +73,17 @@ def init_app():
CONF([], project='glance', default_config_files=config_files)
logging.setup(CONF, "glance")
# NOTE(danms): We are running inside uwsgi or mod_wsgi, so no eventlet;
# use native threading instead.
glance.async_.set_threadpool_model('native')
# NOTE(danms): Change the default threadpool size since we
# are dealing with native threads and not greenthreads.
# Right now, the only pool of default size is tasks_pool,
# so if others are created this will need to change to be
# more specific.
common.DEFAULT_POOL_SIZE = CONF.wsgi.task_pool_threads
if CONF.enabled_backends:
if store_utils.check_reserved_stores(CONF.enabled_backends):
msg = _("'os_glance_' prefix should not be used in "

View File

@ -30,6 +30,10 @@ if os.name == 'nt':
else:
eventlet.patcher.monkey_patch()
import glance.async_
# NOTE(danms): Default to eventlet threading for tests
glance.async_.set_threadpool_model('eventlet')
# See http://code.google.com/p/python-nose/issues/detail?id=373
# The code below enables tests to work with i18n _() blocks
import six.moves.builtins as __builtin__

View File

@ -62,6 +62,14 @@ else:
CONF = cfg.CONF
import glance.async_
# NOTE(danms): Default to eventlet threading for tests
try:
glance.async_.set_threadpool_model('eventlet')
except RuntimeError:
pass
@six.add_metaclass(abc.ABCMeta)
class BaseServer(object):
"""

View File

@ -65,12 +65,16 @@ class TestGlanceApiCmd(test_utils.BaseTestCase):
sys.argv = self.__argv_backup
super(TestGlanceApiCmd, self).tearDown()
@mock.patch('glance.async_.set_threadpool_model',)
@mock.patch.object(prefetcher, 'Prefetcher')
def test_supported_default_store(self, mock_prefetcher):
def test_supported_default_store(self, mock_prefetcher, mock_set_model):
self.config(group='glance_store', default_store='file')
glance.cmd.api.main()
# Make sure we declared the system threadpool model as eventlet
mock_set_model.assert_called_once_with('eventlet')
@mock.patch.object(prefetcher, 'Prefetcher')
@mock.patch('glance.async_.set_threadpool_model', new=mock.MagicMock())
def test_worker_creation_failure(self, mock_prefetcher):
failure = exc.WorkerCreationFailure(reason='test')
self.mock_object(glance.common.wsgi.Server, 'start',

View File

@ -14,6 +14,7 @@
# under the License.
import testtools
from unittest import mock
import webob
import glance.api.common
@ -124,3 +125,27 @@ class TestSizeCheckedIter(testtools.TestCase):
self.assertEqual('CD', next(checked_image))
self.assertEqual('E', next(checked_image))
self.assertRaises(exception.GlanceException, next, checked_image)
class TestThreadPool(testtools.TestCase):
@mock.patch('glance.async_.get_threadpool_model')
def test_get_thread_pool(self, mock_gtm):
get_thread_pool = glance.api.common.get_thread_pool
pool1 = get_thread_pool('pool1', size=123)
get_thread_pool('pool2', size=456)
pool1a = get_thread_pool('pool1')
# Two calls for the same pool should return the exact same thing
self.assertEqual(pool1, pool1a)
# Only two calls to get new threadpools should have been made
mock_gtm.return_value.assert_has_calls(
[mock.call(123), mock.call(456)])
@mock.patch('glance.async_.get_threadpool_model')
def test_get_thread_pool_log(self, mock_gtm):
with mock.patch.object(glance.api.common, 'LOG') as mock_log:
glance.api.common.get_thread_pool('test-pool')
mock_log.debug.assert_called_once_with(
'Initializing named threadpool %r', 'test-pool')

View File

@ -16,6 +16,7 @@
from unittest import mock
import futurist
import glance_store as store
from oslo_config import cfg
from taskflow.patterns import linear_flow
@ -206,3 +207,90 @@ class TestImportTaskFlow(test_utils.BaseTestCase):
for c in self.base_flow:
self.assertIn(c, flow_comp)
self.assertIn('CopyImage', flow_comp)
@mock.patch('glance.async_._THREADPOOL_MODEL', new=None)
class TestSystemThreadPoolModel(test_utils.BaseTestCase):
def test_eventlet_model(self):
model_cls = glance.async_.EventletThreadPoolModel
self.assertEqual(futurist.GreenThreadPoolExecutor,
model_cls.get_threadpool_executor_class())
def test_native_model(self):
model_cls = glance.async_.NativeThreadPoolModel
self.assertEqual(futurist.ThreadPoolExecutor,
model_cls.get_threadpool_executor_class())
@mock.patch('glance.async_.ThreadPoolModel.get_threadpool_executor_class')
def test_base_model_spawn(self, mock_gte):
pool_cls = mock.MagicMock()
pool_cls.configure_mock(__name__='fake')
mock_gte.return_value = pool_cls
model = glance.async_.ThreadPoolModel()
result = model.spawn(print, 'foo', bar='baz')
pool = pool_cls.return_value
# Make sure the default size was passed to the executor
pool_cls.assert_called_once_with(1)
# Make sure we submitted the function to the executor
pool.submit.assert_called_once_with(print, 'foo', bar='baz')
# This isn't used anywhere, but make sure we get the future
self.assertEqual(pool.submit.return_value, result)
@mock.patch('glance.async_.ThreadPoolModel.get_threadpool_executor_class')
def test_base_model_init_with_size(self, mock_gte):
mock_gte.return_value.__name__ = 'TestModel'
with mock.patch.object(glance.async_, 'LOG') as mock_log:
glance.async_.ThreadPoolModel(123)
mock_log.debug.assert_called_once_with(
'Creating threadpool model %r with size %i',
'TestModel', 123)
mock_gte.return_value.assert_called_once_with(123)
def test_set_threadpool_model_native(self):
glance.async_.set_threadpool_model('native')
self.assertEqual(glance.async_.NativeThreadPoolModel,
glance.async_._THREADPOOL_MODEL)
def test_set_threadpool_model_eventlet(self):
glance.async_.set_threadpool_model('eventlet')
self.assertEqual(glance.async_.EventletThreadPoolModel,
glance.async_._THREADPOOL_MODEL)
def test_set_threadpool_model_unknown(self):
# Unknown threadpool models are not tolerated
self.assertRaises(RuntimeError,
glance.async_.set_threadpool_model,
'danthread9000')
def test_set_threadpool_model_again(self):
# Setting the model to the same thing is fine
glance.async_.set_threadpool_model('native')
glance.async_.set_threadpool_model('native')
def test_set_threadpool_model_different(self):
glance.async_.set_threadpool_model('native')
# The model cannot be switched at runtime
self.assertRaises(RuntimeError,
glance.async_.set_threadpool_model,
'eventlet')
def test_set_threadpool_model_log(self):
with mock.patch.object(glance.async_, 'LOG') as mock_log:
glance.async_.set_threadpool_model('eventlet')
mock_log.info.assert_called_once_with(
'Threadpool model set to %r', 'EventletThreadPoolModel')
def test_get_threadpool_model(self):
glance.async_.set_threadpool_model('native')
self.assertEqual(glance.async_.NativeThreadPoolModel,
glance.async_.get_threadpool_model())
def test_get_threadpool_model_unset(self):
# If the model is not set, we get an AssertionError
self.assertRaises(AssertionError,
glance.async_.get_threadpool_model)

View File

@ -15,10 +15,12 @@
from unittest import mock
import futurist
import glance_store
from oslo_config import cfg
from taskflow import engines
import glance.async_
from glance.async_ import taskflow_executor
from glance.common.scripts.image_import import main as image_import
from glance import domain
@ -31,6 +33,10 @@ TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
class TestTaskExecutor(test_utils.BaseTestCase):
def setUp(self):
# NOTE(danms): Makes sure that we have a model set to something
glance.async_._THREADPOOL_MODEL = None
glance.async_.set_threadpool_model('eventlet')
super(TestTaskExecutor, self).setUp()
glance_store.register_opts(CONF)
@ -68,6 +74,15 @@ class TestTaskExecutor(test_utils.BaseTestCase):
self.image_repo,
self.image_factory)
def test_fetch_an_executor_parallel(self):
self.config(engine_mode='parallel', group='taskflow_executor')
pool = self.executor._fetch_an_executor()
self.assertIsInstance(pool, futurist.GreenThreadPoolExecutor)
def test_fetch_an_executor_serial(self):
pool = self.executor._fetch_an_executor()
self.assertIsNone(pool)
def test_begin_processing(self):
with mock.patch.object(engines, 'load') as load_mock:
engine = mock.Mock()

View File

@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
# Copyright 2020, Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from glance.api import common
from glance.common import wsgi_app
from glance.tests import utils as test_utils
class TestWsgiAppInit(test_utils.BaseTestCase):
@mock.patch('glance.common.config.load_paste_app')
@mock.patch('glance.async_.set_threadpool_model')
@mock.patch('glance.common.wsgi_app._get_config_files')
def test_wsgi_init_sets_thread_settings(self, mock_config_files,
mock_set_model,
mock_load):
mock_config_files.return_value = []
self.config(task_pool_threads=123, group='wsgi')
common.DEFAULT_POOL_SIZE = 1024
wsgi_app.init_app()
# Make sure we declared the system threadpool model as native
mock_set_model.assert_called_once_with('native')
# Make sure we set the default pool size
self.assertEqual(123, common.DEFAULT_POOL_SIZE)
mock_load.assert_called_once_with('glance-api')

View File

@ -14,7 +14,6 @@
# under the License.
import datetime
import eventlet
import hashlib
import os
from unittest import mock
@ -726,20 +725,21 @@ class TestImagesController(base.IsolatedUnitTest):
self.controller.import_image, request, UUID4,
{'method': {'name': 'glance-direct'}})
def test_image_import_raises_bad_request(self):
@mock.patch('glance.api.common.get_thread_pool')
def test_image_import_raises_bad_request(self, mock_gpt):
request = unit_test_utils.get_fake_request()
with mock.patch.object(
glance.api.authorization.ImageRepoProxy, 'get') as mock_get:
mock_get.return_value = FakeImage(status='uploading')
# NOTE(abhishekk): Due to
# https://bugs.launchpad.net/glance/+bug/1712463 taskflow is not
# executing. Once it is fixed instead of mocking spawn_n method
# executing. Once it is fixed instead of mocking spawn method
# we should mock execute method of _ImportToStore task.
with mock.patch.object(eventlet.GreenPool, 'spawn_n',
side_effect=ValueError):
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.import_image, request, UUID4,
{'method': {'name': 'glance-direct'}})
mock_gpt.return_value.spawn.side_effect = ValueError
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.import_image, request, UUID4,
{'method': {'name': 'glance-direct'}})
self.assertTrue(mock_gpt.return_value.spawn.called)
def test_image_import_invalid_uri_filtering(self):
request = unit_test_utils.get_fake_request()
@ -2935,7 +2935,10 @@ class TestImagesController(base.IsolatedUnitTest):
pos = self.controller._get_locations_op_pos('1', None, True)
self.assertIsNone(pos)
def test_image_import(self):
@mock.patch.object(glance.api.authorization.TaskFactoryProxy, 'new_task')
@mock.patch.object(glance.domain.TaskExecutorFactory, 'new_task_executor')
@mock.patch('glance.api.common.get_thread_pool')
def test_image_import(self, mock_gtp, mock_nte, mock_nt):
request = unit_test_utils.get_fake_request()
with mock.patch.object(
glance.api.authorization.ImageRepoProxy, 'get') as mock_get:
@ -2945,6 +2948,12 @@ class TestImagesController(base.IsolatedUnitTest):
self.assertEqual(UUID4, output)
# Make sure we grabbed a thread pool, and that we asked it
# to spawn the task's run method with it.
mock_gtp.assert_called_once_with('tasks_pool')
mock_gtp.return_value.spawn.assert_called_once_with(
mock_nt.return_value.run, mock_nte.return_value)
@mock.patch.object(glance.domain.TaskFactory, 'new_task')
@mock.patch.object(glance.api.authorization.ImageRepoProxy, 'get')
def test_image_import_not_allowed(self, mock_get, mock_new_task):

View File

@ -293,11 +293,12 @@ class TestTasksController(test_utils.BaseTestCase):
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.get, request, UUID4)
@mock.patch('glance.api.common.get_thread_pool')
@mock.patch.object(glance.gateway.Gateway, 'get_task_factory')
@mock.patch.object(glance.gateway.Gateway, 'get_task_executor_factory')
@mock.patch.object(glance.gateway.Gateway, 'get_task_repo')
def test_create(self, mock_get_task_repo, mock_get_task_executor_factory,
mock_get_task_factory):
mock_get_task_factory, mock_get_thread_pool):
# setup
request = unit_test_utils.get_fake_request()
task = {
@ -333,6 +334,12 @@ class TestTasksController(test_utils.BaseTestCase):
self.assertEqual(
1, get_task_executor_factory.new_task_executor.call_count)
# Make sure that we spawned the task's run method
mock_get_thread_pool.assert_called_once_with('tasks_pool')
mock_get_thread_pool.return_value.spawn.assert_called_once_with(
new_task.run,
get_task_executor_factory.new_task_executor.return_value)
@mock.patch('glance.common.scripts.utils.get_image_data_iter')
@mock.patch('glance.common.scripts.utils.validate_location_uri')
def test_create_with_live_time(self, mock_validate_location_uri,