From 16a5431c665adadbfcb9909cf3a73334d796c19a Mon Sep 17 00:00:00 2001 From: Dan Smith Date: Mon, 20 Jul 2020 20:49:34 -0700 Subject: [PATCH] Make glance-api able to do async tasks in WSGI mode This teaches glance-api how to do async threading things when it is running in pure-WSGI mode. In order to do that, a refactoring of things that currently depend on eventlet is required. It adds a [wsgi]/task_pool_threads configuration knob, which is used in the case of pure-WSGI and native threads to constrain the number of threads in that pool (and thus the task parallelism). This will allow tuning by the operator, but also lets us default that to just a single thread in the backport of these fixes so that we can avoid introducing a new larger footprint in the backport unexpectedly. Partial-Bug: #1888713 Depends-On: https://review.opendev.org/#/c/742047/ Change-Id: Ie15028b75fb8518ec2b0c0c0386d21782166f759 --- .zuul.yaml | 17 +++ glance/api/common.py | 25 +++- glance/api/v2/images.py | 4 +- glance/api/v2/tasks.py | 4 +- glance/async_/__init__.py | 107 ++++++++++++++++++ glance/async_/taskflow_executor.py | 10 +- glance/cmd/api.py | 4 + glance/common/config.py | 16 +++ glance/common/wsgi_app.py | 13 +++ glance/tests/__init__.py | 4 + glance/tests/functional/__init__.py | 8 ++ glance/tests/unit/api/test_cmd.py | 6 +- glance/tests/unit/api/test_common.py | 25 ++++ glance/tests/unit/async_/test_async.py | 88 ++++++++++++++ .../unit/async_/test_taskflow_executor.py | 15 +++ glance/tests/unit/common/test_wsgi_app.py | 39 +++++++ glance/tests/unit/v2/test_images_resource.py | 27 +++-- glance/tests/unit/v2/test_tasks_resource.py | 9 +- 18 files changed, 392 insertions(+), 29 deletions(-) create mode 100644 glance/tests/unit/common/test_wsgi_app.py diff --git a/.zuul.yaml b/.zuul.yaml index 5040a1e79c..cbbe37896b 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -228,6 +228,21 @@ inject: | "glance_devstack_test":"doyouseeme?" +- job: + name: tempest-integrated-storage-wsgi-import + parent: tempest-integrated-storage + description: | + The regular tempest-integrated-storage job but with glance in wsgi mode + vars: + devstack_localrc: + GLANCE_STANDALONE: False + GLANCE_USE_IMPORT_WORKFLOW: True + devstack_local_conf: + post-config: + $GLANCE_API_CONF: + DEFAULT: + enabled_import_methods: "[\"copy-image\", \"glance-direct\"]" + - project: templates: - check-requirements @@ -261,6 +276,8 @@ irrelevant-files: *tempest-irrelevant-files - tempest-integrated-storage-import-workflow: irrelevant-files: *tempest-irrelevant-files + - tempest-integrated-storage-wsgi-import: + irrelevant-files: *tempest-irrelevant-files - grenade: irrelevant-files: *tempest-irrelevant-files - tempest-ipv6-only: diff --git a/glance/api/common.py b/glance/api/common.py index 30efcb7b06..f4d967d078 100644 --- a/glance/api/common.py +++ b/glance/api/common.py @@ -21,8 +21,8 @@ from oslo_log import log as logging from oslo_utils import excutils from oslo_utils import units +import glance.async_ from glance.common import exception -from glance.common import wsgi from glance.i18n import _, _LE, _LW LOG = logging.getLogger(__name__) @@ -197,20 +197,33 @@ def memoize(lock_name): return memoizer_wrapper -def get_thread_pool(lock_name, size=1024): - """Initializes eventlet thread pool. +# NOTE(danms): This is the default pool size that will be used for +# the get_thread_pool() cache wrapper below. This is a global here +# because it needs to be overridden for the pure-wsgi mode in +# wsgi_app.py where native threads are used. +DEFAULT_POOL_SIZE = 1024 + + +def get_thread_pool(lock_name, size=None): + """Initializes thread pool. If thread pool is present in cache, then returns it from cache else create new pool, stores it in cache and return newly created pool. @param lock_name: Name of the lock. - @param size: Size of eventlet pool. + @param size: Size of pool. - @return: eventlet pool + @return: ThreadPoolModel """ + + if size is None: + size = DEFAULT_POOL_SIZE + @memoize(lock_name) def _get_thread_pool(): - return wsgi.get_asynchronous_eventlet_pool(size=size) + threadpool_cls = glance.async_.get_threadpool_model() + LOG.debug('Initializing named threadpool %r', lock_name) + return threadpool_cls(size) return _get_thread_pool diff --git a/glance/api/v2/images.py b/glance/api/v2/images.py index c18e635572..9c4e08b870 100644 --- a/glance/api/v2/images.py +++ b/glance/api/v2/images.py @@ -216,8 +216,8 @@ class ImagesController(object): task_input=task_input) task_repo.add(import_task) task_executor = executor_factory.new_task_executor(req.context) - pool = common.get_thread_pool("tasks_eventlet_pool") - pool.spawn_n(import_task.run, task_executor) + pool = common.get_thread_pool("tasks_pool") + pool.spawn(import_task.run, task_executor) except exception.Forbidden as e: LOG.debug("User not permitted to create image import task.") raise webob.exc.HTTPForbidden(explanation=e.msg) diff --git a/glance/api/v2/tasks.py b/glance/api/v2/tasks.py index 31f819c52f..871a80320a 100644 --- a/glance/api/v2/tasks.py +++ b/glance/api/v2/tasks.py @@ -78,8 +78,8 @@ class TasksController(object): task_input=task['input']) task_repo.add(new_task) task_executor = executor_factory.new_task_executor(req.context) - pool = common.get_thread_pool("tasks_eventlet_pool") - pool.spawn_n(new_task.run, task_executor) + pool = common.get_thread_pool("tasks_pool") + pool.spawn(new_task.run, task_executor) except exception.Forbidden as e: msg = (_LW("Forbidden to create task. Reason: %(reason)s") % {'reason': encodeutils.exception_to_unicode(e)}) diff --git a/glance/async_/__init__.py b/glance/async_/__init__.py index cd141de272..95ee4a7996 100644 --- a/glance/async_/__init__.py +++ b/glance/async_/__init__.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import futurist from oslo_log import log as logging from glance.i18n import _LE @@ -75,3 +76,109 @@ class TaskExecutor(object): LOG.error(msg) task.fail(_LE("Internal error occurred while trying to process task.")) self.task_repo.save(task) + + +class ThreadPoolModel(object): + """Base class for an abstract ThreadPool. + + Do not instantiate this directly, use one of the concrete + implementations. + """ + + DEFAULTSIZE = 1 + + @staticmethod + def get_threadpool_executor_class(): + """Returns a futurist.ThreadPoolExecutor class.""" + pass + + def __init__(self, size=None): + if size is None: + size = self.DEFAULTSIZE + + threadpool_cls = self.get_threadpool_executor_class() + LOG.debug('Creating threadpool model %r with size %i', + threadpool_cls.__name__, size) + self.pool = threadpool_cls(size) + + def spawn(self, fn, *args, **kwargs): + """Spawn a function with args using the thread pool.""" + LOG.debug('Spawning with %s: %s(%s, %s)' % ( + self.get_threadpool_executor_class().__name__, + fn, args, kwargs)) + return self.pool.submit(fn, *args, **kwargs) + + +class EventletThreadPoolModel(ThreadPoolModel): + """A ThreadPoolModel suitable for use with evenlet/greenthreads.""" + DEFAULTSIZE = 1024 + + @staticmethod + def get_threadpool_executor_class(): + return futurist.GreenThreadPoolExecutor + + +class NativeThreadPoolModel(ThreadPoolModel): + """A ThreadPoolModel suitable for use with native threads.""" + DEFAULTSIZE = 16 + + @staticmethod + def get_threadpool_executor_class(): + return futurist.ThreadPoolExecutor + + +_THREADPOOL_MODEL = None + + +def set_threadpool_model(thread_type): + """Set the system-wide threadpool model. + + This sets the type of ThreadPoolModel to use globally in the process. + It should be called very early in init, and only once. + + :param thread_type: A string indicating the threading type in use, + either "eventlet" or "native" + :raises: RuntimeError if the model is already set or some thread_type + other than one of the supported ones is provided. + """ + global _THREADPOOL_MODEL + + if thread_type == 'native': + model = NativeThreadPoolModel + elif thread_type == 'eventlet': + model = EventletThreadPoolModel + else: + raise RuntimeError( + ('Invalid thread type %r ' + '(must be "native" or "eventlet")') % (thread_type)) + + if _THREADPOOL_MODEL is model: + # Re-setting the same model is fine... + return + + if _THREADPOOL_MODEL is not None: + # ...changing it is not. + raise RuntimeError('Thread model is already set') + + LOG.info('Threadpool model set to %r', model.__name__) + _THREADPOOL_MODEL = model + + +def get_threadpool_model(): + """Returns the system-wide threadpool model class. + + This must be called after set_threadpool_model() whenever + some code needs to know what the threadpool implementation is. + + This may only be called after set_threadpool_model() has been + called to set the desired threading mode. If it is called before + the model is set, it will raise AssertionError. This would likely + be the case if this got run in a test before the model was + initialized, or if glance modules that use threading were imported + and run from some other code without setting the model first. + + :raises: AssertionError if the model has not yet been set. + """ + global _THREADPOOL_MODEL + assert _THREADPOOL_MODEL + return _THREADPOOL_MODEL diff --git a/glance/async_/taskflow_executor.py b/glance/async_/taskflow_executor.py index 07c5e7a92d..867986a358 100644 --- a/glance/async_/taskflow_executor.py +++ b/glance/async_/taskflow_executor.py @@ -13,7 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import futurist from oslo_config import cfg from oslo_log import log as logging from oslo_utils import encodeutils @@ -104,13 +103,8 @@ class TaskExecutor(glance.async_.TaskExecutor): return None else: max_workers = CONF.taskflow_executor.max_workers - try: - return futurist.GreenThreadPoolExecutor( - max_workers=max_workers) - except RuntimeError: - # NOTE(harlowja): I guess eventlet isn't being made - # useable, well just use native threads then (or try to). - return futurist.ThreadPoolExecutor(max_workers=max_workers) + threadpool_cls = glance.async_.get_threadpool_model() + return threadpool_cls(max_workers).pool def _get_flow(self, task): try: diff --git a/glance/cmd/api.py b/glance/cmd/api.py index dfb08ec6b1..47e433ed76 100644 --- a/glance/cmd/api.py +++ b/glance/cmd/api.py @@ -62,6 +62,7 @@ from oslo_config import cfg from oslo_log import log as logging import osprofiler.initializer +import glance.async_ from glance.common import config from glance.common import exception from glance.common import wsgi @@ -107,6 +108,9 @@ def main(): host=CONF.bind_host ) + # NOTE(danms): Configure system-wide threading model to use eventlet + glance.async_.set_threadpool_model('eventlet') + # NOTE(abhishekk): Added initialize_prefetcher KW argument to Server # object so that prefetcher object should only be initialized in case # of API service and ignored in case of registry. Once registry is diff --git a/glance/common/config.py b/glance/common/config.py index 2aa08b60ed..cc5e45ef94 100644 --- a/glance/common/config.py +++ b/glance/common/config.py @@ -566,11 +566,27 @@ Related options: * [DEFAULT]/node_staging_uri""")), ] +wsgi_opts = [ + cfg.IntOpt('task_pool_threads', + default=16, + min=1, + help=_(""" +The number of threads (per worker process) in the pool for processing +asynchronous tasks. This controls how many asynchronous tasks (i.e. for +image interoperable import) each worker can run at a time. If this is +too large, you *may* have increased memory footprint per worker and/or you +may overwhelm other system resources such as disk or outbound network +bandwidth. If this is too small, image import requests will have to wait +until a thread becomes available to begin processing.""")), +] + + CONF = cfg.CONF CONF.register_opts(paste_deploy_opts, group='paste_deploy') CONF.register_opts(image_format_opts, group='image_format') CONF.register_opts(task_opts, group='task') CONF.register_opts(common_opts) +CONF.register_opts(wsgi_opts, group='wsgi') policy.Enforcer(CONF) diff --git a/glance/common/wsgi_app.py b/glance/common/wsgi_app.py index 5dbdf6abd4..2f023912c1 100644 --- a/glance/common/wsgi_app.py +++ b/glance/common/wsgi_app.py @@ -17,6 +17,8 @@ from oslo_config import cfg from oslo_log import log as logging import osprofiler.initializer +from glance.api import common +import glance.async_ from glance.common import config from glance.common import store_utils from glance.i18n import _ @@ -71,6 +73,17 @@ def init_app(): CONF([], project='glance', default_config_files=config_files) logging.setup(CONF, "glance") + # NOTE(danms): We are running inside uwsgi or mod_wsgi, so no eventlet; + # use native threading instead. + glance.async_.set_threadpool_model('native') + + # NOTE(danms): Change the default threadpool size since we + # are dealing with native threads and not greenthreads. + # Right now, the only pool of default size is tasks_pool, + # so if others are created this will need to change to be + # more specific. + common.DEFAULT_POOL_SIZE = CONF.wsgi.task_pool_threads + if CONF.enabled_backends: if store_utils.check_reserved_stores(CONF.enabled_backends): msg = _("'os_glance_' prefix should not be used in " diff --git a/glance/tests/__init__.py b/glance/tests/__init__.py index eab8985891..4fa1ab74d6 100644 --- a/glance/tests/__init__.py +++ b/glance/tests/__init__.py @@ -30,6 +30,10 @@ if os.name == 'nt': else: eventlet.patcher.monkey_patch() +import glance.async_ +# NOTE(danms): Default to eventlet threading for tests +glance.async_.set_threadpool_model('eventlet') + # See http://code.google.com/p/python-nose/issues/detail?id=373 # The code below enables tests to work with i18n _() blocks import six.moves.builtins as __builtin__ diff --git a/glance/tests/functional/__init__.py b/glance/tests/functional/__init__.py index 6506ad6939..4abcbfb313 100644 --- a/glance/tests/functional/__init__.py +++ b/glance/tests/functional/__init__.py @@ -62,6 +62,14 @@ else: CONF = cfg.CONF +import glance.async_ +# NOTE(danms): Default to eventlet threading for tests +try: + glance.async_.set_threadpool_model('eventlet') +except RuntimeError: + pass + + @six.add_metaclass(abc.ABCMeta) class BaseServer(object): """ diff --git a/glance/tests/unit/api/test_cmd.py b/glance/tests/unit/api/test_cmd.py index b7e9dbcbf6..09ab4eff21 100644 --- a/glance/tests/unit/api/test_cmd.py +++ b/glance/tests/unit/api/test_cmd.py @@ -65,12 +65,16 @@ class TestGlanceApiCmd(test_utils.BaseTestCase): sys.argv = self.__argv_backup super(TestGlanceApiCmd, self).tearDown() + @mock.patch('glance.async_.set_threadpool_model',) @mock.patch.object(prefetcher, 'Prefetcher') - def test_supported_default_store(self, mock_prefetcher): + def test_supported_default_store(self, mock_prefetcher, mock_set_model): self.config(group='glance_store', default_store='file') glance.cmd.api.main() + # Make sure we declared the system threadpool model as eventlet + mock_set_model.assert_called_once_with('eventlet') @mock.patch.object(prefetcher, 'Prefetcher') + @mock.patch('glance.async_.set_threadpool_model', new=mock.MagicMock()) def test_worker_creation_failure(self, mock_prefetcher): failure = exc.WorkerCreationFailure(reason='test') self.mock_object(glance.common.wsgi.Server, 'start', diff --git a/glance/tests/unit/api/test_common.py b/glance/tests/unit/api/test_common.py index cf3c9f93fb..e2b5b484f8 100644 --- a/glance/tests/unit/api/test_common.py +++ b/glance/tests/unit/api/test_common.py @@ -14,6 +14,7 @@ # under the License. import testtools +from unittest import mock import webob import glance.api.common @@ -124,3 +125,27 @@ class TestSizeCheckedIter(testtools.TestCase): self.assertEqual('CD', next(checked_image)) self.assertEqual('E', next(checked_image)) self.assertRaises(exception.GlanceException, next, checked_image) + + +class TestThreadPool(testtools.TestCase): + @mock.patch('glance.async_.get_threadpool_model') + def test_get_thread_pool(self, mock_gtm): + get_thread_pool = glance.api.common.get_thread_pool + + pool1 = get_thread_pool('pool1', size=123) + get_thread_pool('pool2', size=456) + pool1a = get_thread_pool('pool1') + + # Two calls for the same pool should return the exact same thing + self.assertEqual(pool1, pool1a) + + # Only two calls to get new threadpools should have been made + mock_gtm.return_value.assert_has_calls( + [mock.call(123), mock.call(456)]) + + @mock.patch('glance.async_.get_threadpool_model') + def test_get_thread_pool_log(self, mock_gtm): + with mock.patch.object(glance.api.common, 'LOG') as mock_log: + glance.api.common.get_thread_pool('test-pool') + mock_log.debug.assert_called_once_with( + 'Initializing named threadpool %r', 'test-pool') diff --git a/glance/tests/unit/async_/test_async.py b/glance/tests/unit/async_/test_async.py index aeda49c1e4..5491da7a64 100644 --- a/glance/tests/unit/async_/test_async.py +++ b/glance/tests/unit/async_/test_async.py @@ -16,6 +16,7 @@ from unittest import mock +import futurist import glance_store as store from oslo_config import cfg from taskflow.patterns import linear_flow @@ -206,3 +207,90 @@ class TestImportTaskFlow(test_utils.BaseTestCase): for c in self.base_flow: self.assertIn(c, flow_comp) self.assertIn('CopyImage', flow_comp) + + +@mock.patch('glance.async_._THREADPOOL_MODEL', new=None) +class TestSystemThreadPoolModel(test_utils.BaseTestCase): + def test_eventlet_model(self): + model_cls = glance.async_.EventletThreadPoolModel + self.assertEqual(futurist.GreenThreadPoolExecutor, + model_cls.get_threadpool_executor_class()) + + def test_native_model(self): + model_cls = glance.async_.NativeThreadPoolModel + self.assertEqual(futurist.ThreadPoolExecutor, + model_cls.get_threadpool_executor_class()) + + @mock.patch('glance.async_.ThreadPoolModel.get_threadpool_executor_class') + def test_base_model_spawn(self, mock_gte): + pool_cls = mock.MagicMock() + pool_cls.configure_mock(__name__='fake') + mock_gte.return_value = pool_cls + + model = glance.async_.ThreadPoolModel() + result = model.spawn(print, 'foo', bar='baz') + + pool = pool_cls.return_value + + # Make sure the default size was passed to the executor + pool_cls.assert_called_once_with(1) + + # Make sure we submitted the function to the executor + pool.submit.assert_called_once_with(print, 'foo', bar='baz') + + # This isn't used anywhere, but make sure we get the future + self.assertEqual(pool.submit.return_value, result) + + @mock.patch('glance.async_.ThreadPoolModel.get_threadpool_executor_class') + def test_base_model_init_with_size(self, mock_gte): + mock_gte.return_value.__name__ = 'TestModel' + with mock.patch.object(glance.async_, 'LOG') as mock_log: + glance.async_.ThreadPoolModel(123) + mock_log.debug.assert_called_once_with( + 'Creating threadpool model %r with size %i', + 'TestModel', 123) + mock_gte.return_value.assert_called_once_with(123) + + def test_set_threadpool_model_native(self): + glance.async_.set_threadpool_model('native') + self.assertEqual(glance.async_.NativeThreadPoolModel, + glance.async_._THREADPOOL_MODEL) + + def test_set_threadpool_model_eventlet(self): + glance.async_.set_threadpool_model('eventlet') + self.assertEqual(glance.async_.EventletThreadPoolModel, + glance.async_._THREADPOOL_MODEL) + + def test_set_threadpool_model_unknown(self): + # Unknown threadpool models are not tolerated + self.assertRaises(RuntimeError, + glance.async_.set_threadpool_model, + 'danthread9000') + + def test_set_threadpool_model_again(self): + # Setting the model to the same thing is fine + glance.async_.set_threadpool_model('native') + glance.async_.set_threadpool_model('native') + + def test_set_threadpool_model_different(self): + glance.async_.set_threadpool_model('native') + # The model cannot be switched at runtime + self.assertRaises(RuntimeError, + glance.async_.set_threadpool_model, + 'eventlet') + + def test_set_threadpool_model_log(self): + with mock.patch.object(glance.async_, 'LOG') as mock_log: + glance.async_.set_threadpool_model('eventlet') + mock_log.info.assert_called_once_with( + 'Threadpool model set to %r', 'EventletThreadPoolModel') + + def test_get_threadpool_model(self): + glance.async_.set_threadpool_model('native') + self.assertEqual(glance.async_.NativeThreadPoolModel, + glance.async_.get_threadpool_model()) + + def test_get_threadpool_model_unset(self): + # If the model is not set, we get an AssertionError + self.assertRaises(AssertionError, + glance.async_.get_threadpool_model) diff --git a/glance/tests/unit/async_/test_taskflow_executor.py b/glance/tests/unit/async_/test_taskflow_executor.py index 5ceb87d3e8..b31ad85011 100644 --- a/glance/tests/unit/async_/test_taskflow_executor.py +++ b/glance/tests/unit/async_/test_taskflow_executor.py @@ -15,10 +15,12 @@ from unittest import mock +import futurist import glance_store from oslo_config import cfg from taskflow import engines +import glance.async_ from glance.async_ import taskflow_executor from glance.common.scripts.image_import import main as image_import from glance import domain @@ -31,6 +33,10 @@ TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df' class TestTaskExecutor(test_utils.BaseTestCase): def setUp(self): + # NOTE(danms): Makes sure that we have a model set to something + glance.async_._THREADPOOL_MODEL = None + glance.async_.set_threadpool_model('eventlet') + super(TestTaskExecutor, self).setUp() glance_store.register_opts(CONF) @@ -68,6 +74,15 @@ class TestTaskExecutor(test_utils.BaseTestCase): self.image_repo, self.image_factory) + def test_fetch_an_executor_parallel(self): + self.config(engine_mode='parallel', group='taskflow_executor') + pool = self.executor._fetch_an_executor() + self.assertIsInstance(pool, futurist.GreenThreadPoolExecutor) + + def test_fetch_an_executor_serial(self): + pool = self.executor._fetch_an_executor() + self.assertIsNone(pool) + def test_begin_processing(self): with mock.patch.object(engines, 'load') as load_mock: engine = mock.Mock() diff --git a/glance/tests/unit/common/test_wsgi_app.py b/glance/tests/unit/common/test_wsgi_app.py new file mode 100644 index 0000000000..b58a8ac354 --- /dev/null +++ b/glance/tests/unit/common/test_wsgi_app.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# Copyright 2020, Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import mock + +from glance.api import common +from glance.common import wsgi_app +from glance.tests import utils as test_utils + + +class TestWsgiAppInit(test_utils.BaseTestCase): + @mock.patch('glance.common.config.load_paste_app') + @mock.patch('glance.async_.set_threadpool_model') + @mock.patch('glance.common.wsgi_app._get_config_files') + def test_wsgi_init_sets_thread_settings(self, mock_config_files, + mock_set_model, + mock_load): + mock_config_files.return_value = [] + self.config(task_pool_threads=123, group='wsgi') + common.DEFAULT_POOL_SIZE = 1024 + wsgi_app.init_app() + # Make sure we declared the system threadpool model as native + mock_set_model.assert_called_once_with('native') + # Make sure we set the default pool size + self.assertEqual(123, common.DEFAULT_POOL_SIZE) + mock_load.assert_called_once_with('glance-api') diff --git a/glance/tests/unit/v2/test_images_resource.py b/glance/tests/unit/v2/test_images_resource.py index 762f293cd8..b8a05506e9 100644 --- a/glance/tests/unit/v2/test_images_resource.py +++ b/glance/tests/unit/v2/test_images_resource.py @@ -14,7 +14,6 @@ # under the License. import datetime -import eventlet import hashlib import os from unittest import mock @@ -726,20 +725,21 @@ class TestImagesController(base.IsolatedUnitTest): self.controller.import_image, request, UUID4, {'method': {'name': 'glance-direct'}}) - def test_image_import_raises_bad_request(self): + @mock.patch('glance.api.common.get_thread_pool') + def test_image_import_raises_bad_request(self, mock_gpt): request = unit_test_utils.get_fake_request() with mock.patch.object( glance.api.authorization.ImageRepoProxy, 'get') as mock_get: mock_get.return_value = FakeImage(status='uploading') # NOTE(abhishekk): Due to # https://bugs.launchpad.net/glance/+bug/1712463 taskflow is not - # executing. Once it is fixed instead of mocking spawn_n method + # executing. Once it is fixed instead of mocking spawn method # we should mock execute method of _ImportToStore task. - with mock.patch.object(eventlet.GreenPool, 'spawn_n', - side_effect=ValueError): - self.assertRaises(webob.exc.HTTPBadRequest, - self.controller.import_image, request, UUID4, - {'method': {'name': 'glance-direct'}}) + mock_gpt.return_value.spawn.side_effect = ValueError + self.assertRaises(webob.exc.HTTPBadRequest, + self.controller.import_image, request, UUID4, + {'method': {'name': 'glance-direct'}}) + self.assertTrue(mock_gpt.return_value.spawn.called) def test_image_import_invalid_uri_filtering(self): request = unit_test_utils.get_fake_request() @@ -2935,7 +2935,10 @@ class TestImagesController(base.IsolatedUnitTest): pos = self.controller._get_locations_op_pos('1', None, True) self.assertIsNone(pos) - def test_image_import(self): + @mock.patch.object(glance.api.authorization.TaskFactoryProxy, 'new_task') + @mock.patch.object(glance.domain.TaskExecutorFactory, 'new_task_executor') + @mock.patch('glance.api.common.get_thread_pool') + def test_image_import(self, mock_gtp, mock_nte, mock_nt): request = unit_test_utils.get_fake_request() with mock.patch.object( glance.api.authorization.ImageRepoProxy, 'get') as mock_get: @@ -2945,6 +2948,12 @@ class TestImagesController(base.IsolatedUnitTest): self.assertEqual(UUID4, output) + # Make sure we grabbed a thread pool, and that we asked it + # to spawn the task's run method with it. + mock_gtp.assert_called_once_with('tasks_pool') + mock_gtp.return_value.spawn.assert_called_once_with( + mock_nt.return_value.run, mock_nte.return_value) + @mock.patch.object(glance.domain.TaskFactory, 'new_task') @mock.patch.object(glance.api.authorization.ImageRepoProxy, 'get') def test_image_import_not_allowed(self, mock_get, mock_new_task): diff --git a/glance/tests/unit/v2/test_tasks_resource.py b/glance/tests/unit/v2/test_tasks_resource.py index db634dd383..5fcee6b278 100644 --- a/glance/tests/unit/v2/test_tasks_resource.py +++ b/glance/tests/unit/v2/test_tasks_resource.py @@ -293,11 +293,12 @@ class TestTasksController(test_utils.BaseTestCase): self.assertRaises(webob.exc.HTTPNotFound, self.controller.get, request, UUID4) + @mock.patch('glance.api.common.get_thread_pool') @mock.patch.object(glance.gateway.Gateway, 'get_task_factory') @mock.patch.object(glance.gateway.Gateway, 'get_task_executor_factory') @mock.patch.object(glance.gateway.Gateway, 'get_task_repo') def test_create(self, mock_get_task_repo, mock_get_task_executor_factory, - mock_get_task_factory): + mock_get_task_factory, mock_get_thread_pool): # setup request = unit_test_utils.get_fake_request() task = { @@ -333,6 +334,12 @@ class TestTasksController(test_utils.BaseTestCase): self.assertEqual( 1, get_task_executor_factory.new_task_executor.call_count) + # Make sure that we spawned the task's run method + mock_get_thread_pool.assert_called_once_with('tasks_pool') + mock_get_thread_pool.return_value.spawn.assert_called_once_with( + new_task.run, + get_task_executor_factory.new_task_executor.return_value) + @mock.patch('glance.common.scripts.utils.get_image_data_iter') @mock.patch('glance.common.scripts.utils.validate_location_uri') def test_create_with_live_time(self, mock_validate_location_uri,