Merge "No need to have async executor fetching be a contextmanager"

This commit is contained in:
Jenkins 2016-02-09 22:20:52 +00:00 committed by Gerrit Code Review
commit d64fd960d6

View File

@ -13,8 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import futurist
from oslo_config import cfg
from oslo_log import log as logging
@ -64,16 +62,19 @@ class TaskExecutor(glance.async.TaskExecutor):
super(TaskExecutor, self).__init__(context, task_repo, image_repo,
image_factory)
@contextlib.contextmanager
def _executor(self):
@staticmethod
def _fetch_an_executor():
if CONF.taskflow_executor.engine_mode != 'parallel':
yield None
return None
else:
max_workers = CONF.taskflow_executor.max_workers
try:
yield futurist.GreenThreadPoolExecutor(max_workers=max_workers)
return futurist.GreenThreadPoolExecutor(
max_workers=max_workers)
except RuntimeError:
yield futurist.ThreadPoolExecutor(max_workers=max_workers)
# NOTE(harlowja): I guess eventlet isn't being made
# useable, well just use native threads then (or try to).
return futurist.ThreadPoolExecutor(max_workers=max_workers)
def _get_flow(self, task):
try:
@ -125,16 +126,14 @@ class TaskExecutor(glance.async.TaskExecutor):
return
flow = self._get_flow(task)
executor = self._fetch_an_executor()
try:
with self._executor() as executor:
engine = engines.load(
flow,
engine=CONF.taskflow_executor.engine_mode,
executor=executor,
max_workers=CONF.taskflow_executor.max_workers)
with llistener.DynamicLoggingListener(engine, log=LOG):
engine.run()
engine = engines.load(
flow,
engine=CONF.taskflow_executor.engine_mode, executor=executor,
max_workers=CONF.taskflow_executor.max_workers)
with llistener.DynamicLoggingListener(engine, log=LOG):
engine.run()
except Exception as exc:
with excutils.save_and_reraise_exception():
LOG.error(_LE('Failed to execute task %(task_id)s: %(exc)s') %
@ -144,3 +143,6 @@ class TaskExecutor(glance.async.TaskExecutor):
# task failure message.
task.fail(_('Task failed due to Internal Error'))
self.task_repo.save(task)
finally:
if executor is not None:
executor.shutdown()