Remove the eventlet executor

We added the eventlet executor waiting for taskflow to land and be ready
for us to consume it. Now that we have it, it's time to remove the
eventlet executor in favor of taskflow's parallel executors.

DocImpact
UpgradeImpact

Partially-implements blueprint: new-upload-workflow

Change-Id: I220a14b2a92949772d5322c9947c42e892cfdbfa
This commit is contained in:
Flavio Percoco 2015-02-25 10:02:50 +01:00
parent 578f9792ba
commit ae3135e1d6
10 changed files with 71 additions and 141 deletions

@ -1358,21 +1358,24 @@ the ``failure`` state.
Optional. Default: ``48``
The config value ``task_executor`` is used to determine which executor
should be used by the Glance service to process the task. Options include
``eventlet`` and ``taskflow``.
should be used by the Glance service to process the task. The currently
available implementation is: ``taskflow``.
* ``task_executor=<executor_type>``
Optional. Default: ``eventlet``
Optional. Default: ``taskflow``
The config value ``eventlet_executor_pool_size`` is used to configure the
eventlet task executor. It sets the maximum on the number of threads which can
be spun up at any given point of time, that are used for the execution of
Glance Tasks.
The ``taskflow`` engine has its own set of configuration options,
under the ``taskflow_executor`` section, that can be tuned to improve
the task execution process. Among the available options, you may find
``engine_mode`` and ``max_workers``. The former allows for selecting
an execution model and the available options are ``serial``,
``parallel`` and ``worker-based``. The ``max_workers`` option,
instead, allows for controlling the number of workers that will be
instantiated per executor instance.
* ``eventlet_executor_pool_size=<Size_of_pool_in_int>``
Optional. Default: ``1000``
The default value for the ``engine_mode`` is ``parallel``, whereas
the default number of ``max_workers`` is ``10``.
Configuring Glance performance profiling
----------------------------------------

@ -450,13 +450,12 @@ revocation_cache_time = 10
# task_time_to_live = 48
# Specifies which task executor to be used to run the task scripts.
# The default value for task_executor is eventlet.
# task_executor = eventlet
# Existing but disabled executors
# taskflow
# The default value for task_executor is taskflow.
# task_executor = taskflow
# Specifies the maximum number of eventlet threads which can be spun up by
# the eventlet based task executor to perform execution of Glance tasks.
# DEPRECATED: Use [taskflow_executor]/max_workers instead.
# eventlet_executor_pool_size = 1000
[taskflow_executor]
@ -465,8 +464,8 @@ revocation_cache_time = 10
# The number of parallel activities executed at the same time by
# the engine. The value can be greater than one when the engine mode is
# 'parallel'.
#max_workers = 1
# 'parallel', otherwise this value will be ignored.
#max_workers = 10
[glance_store]
# List of which store classes and store class locations are

@ -1,61 +0,0 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from oslo_concurrency import lockutils
from oslo_config import cfg
import glance.async
import glance.common.scripts as scripts
from glance import i18n
import glance.openstack.common.log as logging
_LI = i18n._LI
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_opt('eventlet_executor_pool_size', 'glance.common.config',
group='task')
_MAX_EXECUTOR_THREADS = CONF.task.eventlet_executor_pool_size
_THREAD_POOL = None
class TaskExecutor(glance.async.TaskExecutor):
def __init__(self, context, task_repo, image_repo, image_factory):
super(TaskExecutor, self).__init__(context, task_repo, image_repo,
image_factory)
if _THREAD_POOL is None:
self._set_gobal_threadpool_if_none()
@lockutils.synchronized("tasks_eventlet_pool")
def _set_gobal_threadpool_if_none(self):
global _THREAD_POOL
if _THREAD_POOL is None:
_THREAD_POOL = eventlet.GreenPool(size=_MAX_EXECUTOR_THREADS)
def _run(self, task_id, task_type):
LOG.info(_LI('Eventlet executor picked up the execution of task ID '
'%(task_id)s of task type '
'%(task_type)s') % {'task_id': task_id,
'task_type': task_type})
_THREAD_POOL.spawn_n(scripts.run_task,
task_id,
task_type,
self.context,
self.task_repo,
self.image_repo,
self.image_factory)

@ -13,12 +13,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from oslo_config import cfg
from oslo_utils import excutils
from taskflow import engines
from taskflow.listeners import logging as llistener
from taskflow.patterns import linear_flow as lf
from taskflow import task
from taskflow.types import futures
from taskflow.utils import eventlet_utils
import glance.async
import glance.common.scripts as scripts
@ -30,18 +34,21 @@ _LI = i18n._LI
_LE = i18n._LE
LOG = logging.getLogger(__name__)
_deprecated_opt = cfg.DeprecatedOpt('eventlet_executor_pool_size',
group='task')
taskflow_executor_opts = [
cfg.StrOpt('engine_mode',
default='serial',
default='parallel',
choices=('serial', 'parallel'),
help=_("The mode in which the engine will run. "
"Can be 'serial' or 'parallel'.")),
cfg.IntOpt('max_workers',
default=1,
default=10,
help=_("The number of parallel activities executed at the "
"same time by the engine. The value can be greater "
"than one when the engine mode is 'parallel'."))
"than one when the engine mode is 'parallel'."),
deprecated_opts=[_deprecated_opt])
]
@ -83,6 +90,17 @@ class TaskExecutor(glance.async.TaskExecutor):
super(TaskExecutor, self).__init__(context, task_repo, image_repo,
image_factory)
@contextlib.contextmanager
def _executor(self):
if CONF.taskflow_executor.engine_mode != 'parallel':
yield None
else:
max_workers = CONF.taskflow_executor.max_workers
if eventlet_utils.EVENTLET_AVAILABLE:
yield futures.GreenThreadPoolExecutor(max_workers=max_workers)
else:
yield futures.ThreadPoolExecutor(max_workers=max_workers)
def _run(self, task_id, task_type):
LOG.info(_LI('Taskflow executor picked up the execution of task ID '
'%(task_id)s of task type '
@ -93,9 +111,11 @@ class TaskExecutor(glance.async.TaskExecutor):
self.image_repo, self.image_factory)
)
try:
engine = engines.load(flow, self.engine_conf, **self.engine_kwargs)
with llistener.DynamicLoggingListener(engine, log=LOG):
engine.run()
with self._executor() as executor:
engine = engines.load(flow, self.engine_conf,
executor=executor, **self.engine_kwargs)
with llistener.DynamicLoggingListener(engine, log=LOG):
engine.run()
except Exception as exc:
with excutils.save_and_reraise_exception():
LOG.error(_LE('Failed to execute task %(task_id)s: %(exc)s') %

@ -67,14 +67,9 @@ task_opts = [
deprecated_opts=[cfg.DeprecatedOpt('task_time_to_live',
group='DEFAULT')]),
cfg.StrOpt('task_executor',
default='eventlet',
default='taskflow',
help=_("Specifies which task executor to be used to run the "
"task scripts.")),
cfg.IntOpt('eventlet_executor_pool_size',
default=1000,
help=_("Specifies the maximum number of eventlet threads which "
"can be spun up by the eventlet based task executor to "
"perform execution of Glance tasks.")),
]
manage_opts = [
cfg.BoolOpt('db_enforce_mysql_charset',

@ -461,9 +461,17 @@ class TaskExecutorFactory(object):
def new_task_executor(self, context):
try:
# NOTE(flaper87): Backwards compatibility layer.
# It'll allow us to provide a deprecation path to
# users that are currently consuming the `eventlet`
# executor.
task_executor = CONF.task.task_executor
if task_executor == 'eventlet':
task_executor = 'taskflow'
executor_cls = ('glance.async.%s_executor.'
'TaskExecutor' % CONF.task.task_executor)
LOG.debug("Loading %s executor" % CONF.task.task_executor)
'TaskExecutor' % task_executor)
LOG.debug("Loading %s executor" % task_executor)
executor = importutils.import_class(executor_cls)
return executor(context,
self.task_repo,

@ -159,13 +159,13 @@ class TestTasksApi(base.ApiTest):
self.assertIsNotNone(data)
self.assertEqual(1, len(data['tasks']))
# NOTE(venkatesh) find a way to get expected_keys from tasks controller
expected_keys = set(['id', 'type', 'owner', 'status',
expected_keys = set(['id', 'expires_at', 'type', 'owner', 'status',
'created_at', 'updated_at', 'self', 'schema'])
task = data['tasks'][0]
self.assertEqual(expected_keys, set(task.keys()))
self.assertEqual(req_input['type'], task['type'])
self.assertEqual(task_owner, task['owner'])
self.assertEqual('processing', task['status'])
self.assertEqual('success', task['status'])
self.assertIsNotNone(task['created_at'])
self.assertIsNotNone(task['updated_at'])

@ -1,47 +0,0 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from glance.async import eventlet_executor
import glance.tests.utils as test_utils
class TestTaskExecutor(test_utils.BaseTestCase):
def setUp(self):
super(TestTaskExecutor, self).setUp()
self.context = mock.Mock()
self.task_repo = mock.Mock()
self.image_repo = mock.Mock()
self.image_factory = mock.Mock()
self.executor = eventlet_executor.TaskExecutor(
self.context,
self.task_repo,
self.image_repo,
self.image_factory)
def test_begin_processing(self):
task_id = mock.ANY
task = mock.Mock()
task.type = mock.ANY
with mock.patch.object(eventlet_executor.TaskExecutor,
'_run') as mock_run:
self.task_repo.get.return_value = task
self.executor.begin_processing(task_id)
# assert the call
mock_run.assert_called_once_with(task_id, task.type)

@ -23,6 +23,7 @@ import oslo_utils.importutils
from oslo_utils import timeutils
import glance.async
from glance.async import taskflow_executor
from glance.common import exception
from glance import domain
import glance.tests.utils as test_utils
@ -549,3 +550,16 @@ class TestTaskExecutorFactory(test_utils.BaseTestCase):
self.assertRaises(ImportError,
task_executor_factory.new_task_executor,
context)
def test_new_task_eventlet_backwards_compatibility(self):
context = mock.MagicMock()
self.config(task_executor='eventlet', group='task')
task_executor_factory = domain.TaskExecutorFactory(self.task_repo,
self.image_repo,
self.image_factory)
# NOTE(flaper87): "eventlet" executor. short name to avoid > 79.
te_evnt = task_executor_factory.new_task_executor(context)
self.assertIsInstance(te_evnt, taskflow_executor.TaskExecutor)

@ -140,7 +140,6 @@ class OptsTestCase(utils.BaseTestCase):
'disk_formats',
'task_time_to_live',
'task_executor',
'eventlet_executor_pool_size',
'store_type_preference',
'flavor',
'config_file',