Merge "Add back a 'eventlet_utils' helper utility module"

This commit is contained in:
Jenkins
2015-01-19 03:05:18 +00:00
committed by Gerrit Code Review
14 changed files with 70 additions and 32 deletions

View File

@@ -18,6 +18,11 @@ Deprecation
.. automodule:: taskflow.utils.deprecation .. automodule:: taskflow.utils.deprecation
Eventlet
~~~~~~~~
.. automodule:: taskflow.utils.eventlet_utils
Kazoo Kazoo
~~~~~ ~~~~~

View File

@@ -25,17 +25,12 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir)) os.pardir))
sys.path.insert(0, top_dir) sys.path.insert(0, top_dir)
try:
import eventlet # noqa
EVENTLET_AVAILABLE = True
except ImportError:
EVENTLET_AVAILABLE = False
from taskflow import engines from taskflow import engines
from taskflow.patterns import linear_flow as lf from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf from taskflow.patterns import unordered_flow as uf
from taskflow import task from taskflow import task
from taskflow.types import futures from taskflow.types import futures
from taskflow.utils import eventlet_utils
# INTRO: This is the defacto hello world equivalent for taskflow; it shows how # INTRO: This is the defacto hello world equivalent for taskflow; it shows how
@@ -86,7 +81,7 @@ song.add(PrinterTask("conductor@begin",
show_name=False, inject={'output': "*dong*"})) show_name=False, inject={'output': "*dong*"}))
# Run in parallel using eventlet green threads... # Run in parallel using eventlet green threads...
if EVENTLET_AVAILABLE: if eventlet_utils.EVENTLET_AVAILABLE:
with futures.GreenThreadPoolExecutor() as executor: with futures.GreenThreadPoolExecutor() as executor:
e = engines.load(song, executor=executor, engine='parallel') e = engines.load(song, executor=executor, engine='parallel')
e.run() e.run()

View File

@@ -33,7 +33,7 @@ from taskflow import engines
from taskflow.patterns import unordered_flow as uf from taskflow.patterns import unordered_flow as uf
from taskflow import task from taskflow import task
from taskflow.types import futures from taskflow.types import futures
from taskflow.utils import async_utils from taskflow.utils import eventlet_utils
# INTRO: This example walks through a miniature workflow which does a parallel # INTRO: This example walks through a miniature workflow which does a parallel
# table modification where each row in the table gets adjusted by a thread, or # table modification where each row in the table gets adjusted by a thread, or
@@ -97,7 +97,7 @@ def main():
f = make_flow(tbl) f = make_flow(tbl)
# Now run it (using the specified executor)... # Now run it (using the specified executor)...
if async_utils.EVENTLET_AVAILABLE: if eventlet_utils.EVENTLET_AVAILABLE:
executor = futures.GreenThreadPoolExecutor(max_workers=5) executor = futures.GreenThreadPoolExecutor(max_workers=5)
else: else:
executor = futures.ThreadPoolExecutor(max_workers=5) executor = futures.ThreadPoolExecutor(max_workers=5)

View File

@@ -39,7 +39,7 @@ from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf from taskflow.patterns import linear_flow as lf
from taskflow import task from taskflow import task
from taskflow.types import futures from taskflow.types import futures
from taskflow.utils import async_utils from taskflow.utils import eventlet_utils
from taskflow.utils import persistence_utils as p_utils from taskflow.utils import persistence_utils as p_utils
import example_utils as eu # noqa import example_utils as eu # noqa
@@ -238,7 +238,7 @@ with eu.get_backend() as backend:
# Set up how we want our engine to run, serial, parallel... # Set up how we want our engine to run, serial, parallel...
executor = None executor = None
if async_utils.EVENTLET_AVAILABLE: if eventlet_utils.EVENTLET_AVAILABLE:
executor = futures.GreenThreadPoolExecutor(5) executor = futures.GreenThreadPoolExecutor(5)
# Create/fetch a logbook that will track the workflows work. # Create/fetch a logbook that will track the workflows work.

View File

@@ -38,7 +38,7 @@ from taskflow.persistence.backends.sqlalchemy import migration
from taskflow.persistence.backends.sqlalchemy import models from taskflow.persistence.backends.sqlalchemy import models
from taskflow.persistence import logbook from taskflow.persistence import logbook
from taskflow.types import failure from taskflow.types import failure
from taskflow.utils import async_utils from taskflow.utils import eventlet_utils
from taskflow.utils import misc from taskflow.utils import misc
@@ -250,7 +250,7 @@ class SQLAlchemyBackend(base.Backend):
engine_args.update(conf.pop('engine_args', {})) engine_args.update(conf.pop('engine_args', {}))
engine = sa.create_engine(sql_connection, **engine_args) engine = sa.create_engine(sql_connection, **engine_args)
checkin_yield = conf.pop('checkin_yield', checkin_yield = conf.pop('checkin_yield',
async_utils.EVENTLET_AVAILABLE) eventlet_utils.EVENTLET_AVAILABLE)
if _as_bool(checkin_yield): if _as_bool(checkin_yield):
sa.event.listen(engine, 'checkin', _thread_yield) sa.event.listen(engine, 'checkin', _thread_yield)
if 'mysql' in e_url.drivername: if 'mysql' in e_url.drivername:

View File

@@ -23,7 +23,7 @@ from taskflow.persistence import backends
from taskflow import test from taskflow import test
from taskflow.tests import utils from taskflow.tests import utils
from taskflow.types import futures as futures from taskflow.types import futures as futures
from taskflow.utils import async_utils as au from taskflow.utils import eventlet_utils as eu
from taskflow.utils import persistence_utils as pu from taskflow.utils import persistence_utils as pu
@@ -61,7 +61,7 @@ class ParallelCreationTest(test.TestCase):
self.assertIsInstance(eng._task_executor, self.assertIsInstance(eng._task_executor,
executor.ParallelProcessTaskExecutor) executor.ParallelProcessTaskExecutor)
@testtools.skipIf(not au.EVENTLET_AVAILABLE, 'eventlet is not available') @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
def test_green_executor_creation(self): def test_green_executor_creation(self):
with futures.GreenThreadPoolExecutor(1) as e: with futures.GreenThreadPoolExecutor(1) as e:
eng = self._create_engine(executor=e) eng = self._create_engine(executor=e)

View File

@@ -34,7 +34,7 @@ from taskflow.tests import utils
from taskflow.types import failure from taskflow.types import failure
from taskflow.types import futures from taskflow.types import futures
from taskflow.types import graph as gr from taskflow.types import graph as gr
from taskflow.utils import async_utils as au from taskflow.utils import eventlet_utils as eu
from taskflow.utils import persistence_utils as p_utils from taskflow.utils import persistence_utils as p_utils
from taskflow.utils import threading_utils as tu from taskflow.utils import threading_utils as tu
@@ -652,7 +652,7 @@ class ParallelEngineWithThreadsTest(EngineTaskTest,
executor.shutdown(wait=True) executor.shutdown(wait=True)
@testtools.skipIf(not au.EVENTLET_AVAILABLE, 'eventlet is not available') @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
class ParallelEngineWithEventletTest(EngineTaskTest, class ParallelEngineWithEventletTest(EngineTaskTest,
EngineLinearFlowTest, EngineLinearFlowTest,
EngineParallelFlowTest, EngineParallelFlowTest,

View File

@@ -23,13 +23,13 @@ import testtools
from taskflow import test from taskflow import test
from taskflow.types import futures from taskflow.types import futures
from taskflow.utils import eventlet_utils as eu
try: try:
from eventlet.green import threading as greenthreading from eventlet.green import threading as greenthreading
from eventlet.green import time as greentime from eventlet.green import time as greentime
EVENTLET_AVAILABLE = True
except ImportError: except ImportError:
EVENTLET_AVAILABLE = False pass
def _noop(): def _noop():
@@ -194,7 +194,7 @@ class SynchronousExecutorTest(test.TestCase, _FuturesTestMixin):
pass pass
@testtools.skipIf(not EVENTLET_AVAILABLE, 'eventlet is not available') @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
class GreenThreadPoolExecutorTest(test.TestCase, _FuturesTestMixin): class GreenThreadPoolExecutorTest(test.TestCase, _FuturesTestMixin):
def _make_executor(self, max_workers): def _make_executor(self, max_workers):
return futures.GreenThreadPoolExecutor(max_workers=max_workers) return futures.GreenThreadPoolExecutor(max_workers=max_workers)

View File

@@ -27,7 +27,7 @@ from taskflow import test
from taskflow.tests import utils from taskflow.tests import utils
from taskflow.types import failure from taskflow.types import failure
from taskflow.types import futures from taskflow.types import futures
from taskflow.utils import async_utils as au from taskflow.utils import eventlet_utils as eu
class FailingRetry(retry.Retry): class FailingRetry(retry.Retry):
@@ -980,7 +980,7 @@ class ParallelEngineWithThreadsTest(RetryTest,
max_workers=self._EXECUTOR_WORKERS) max_workers=self._EXECUTOR_WORKERS)
@testtools.skipIf(not au.EVENTLET_AVAILABLE, 'eventlet is not available') @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
class ParallelEngineWithEventletTest(RetryTest, test.TestCase): class ParallelEngineWithEventletTest(RetryTest, test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None): def _make_engine(self, flow, flow_detail=None, executor=None):

View File

@@ -23,7 +23,7 @@ from taskflow import states
from taskflow import test from taskflow import test
from taskflow.tests import utils from taskflow.tests import utils
from taskflow.types import futures from taskflow.types import futures
from taskflow.utils import async_utils as au from taskflow.utils import eventlet_utils as eu
class SuspendingListener(utils.CaptureListener): class SuspendingListener(utils.CaptureListener):
@@ -212,7 +212,7 @@ class ParallelEngineWithThreadsTest(SuspendTest, test.TestCase):
max_workers=self._EXECUTOR_WORKERS) max_workers=self._EXECUTOR_WORKERS)
@testtools.skipIf(not au.EVENTLET_AVAILABLE, 'eventlet is not available') @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
class ParallelEngineWithEventletTest(SuspendTest, test.TestCase): class ParallelEngineWithEventletTest(SuspendTest, test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None): def _make_engine(self, flow, flow_detail=None, executor=None):

View File

@@ -19,6 +19,7 @@ import testtools
from taskflow import test from taskflow import test
from taskflow.types import futures from taskflow.types import futures
from taskflow.utils import async_utils as au from taskflow.utils import async_utils as au
from taskflow.utils import eventlet_utils as eu
class WaitForAnyTestsMixin(object): class WaitForAnyTestsMixin(object):
@@ -52,7 +53,7 @@ class WaitForAnyTestsMixin(object):
self.assertIs(done.pop(), f2) self.assertIs(done.pop(), f2)
@testtools.skipIf(not au.EVENTLET_AVAILABLE, 'eventlet is not available') @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
class AsyncUtilsEventletTest(test.TestCase, class AsyncUtilsEventletTest(test.TestCase,
WaitForAnyTestsMixin): WaitForAnyTestsMixin):
def _make_executor(self, max_workers): def _make_executor(self, max_workers):

View File

@@ -27,11 +27,11 @@ try:
from eventlet import greenpool from eventlet import greenpool
from eventlet import patcher as greenpatcher from eventlet import patcher as greenpatcher
from eventlet import queue as greenqueue from eventlet import queue as greenqueue
EVENTLET_AVAILABLE = True
except ImportError: except ImportError:
EVENTLET_AVAILABLE = False pass
from taskflow.types import timing from taskflow.types import timing
from taskflow.utils import eventlet_utils as eu
from taskflow.utils import threading_utils as tu from taskflow.utils import threading_utils as tu
@@ -245,7 +245,8 @@ class _GreenWorker(object):
class GreenFuture(Future): class GreenFuture(Future):
def __init__(self): def __init__(self):
super(GreenFuture, self).__init__() super(GreenFuture, self).__init__()
assert EVENTLET_AVAILABLE, 'eventlet is needed to use a green future' eu.check_for_eventlet(RuntimeError('Eventlet is needed to use a green'
' future'))
# NOTE(harlowja): replace the built-in condition with a greenthread # NOTE(harlowja): replace the built-in condition with a greenthread
# compatible one so that when getting the result of this future the # compatible one so that when getting the result of this future the
# functions will correctly yield to eventlet. If this is not done then # functions will correctly yield to eventlet. If this is not done then
@@ -266,7 +267,8 @@ class GreenThreadPoolExecutor(_futures.Executor):
""" """
def __init__(self, max_workers=1000): def __init__(self, max_workers=1000):
assert EVENTLET_AVAILABLE, 'eventlet is needed to use a green executor' eu.check_for_eventlet(RuntimeError('Eventlet is needed to use a green'
' executor'))
if max_workers <= 0: if max_workers <= 0:
raise ValueError("Max workers must be greater than zero") raise ValueError("Max workers must be greater than zero")
self._max_workers = max_workers self._max_workers = max_workers

View File

@@ -19,11 +19,11 @@ from concurrent.futures import _base
try: try:
from eventlet.green import threading as greenthreading from eventlet.green import threading as greenthreading
EVENTLET_AVAILABLE = True
except ImportError: except ImportError:
EVENTLET_AVAILABLE = False pass
from taskflow.types import futures from taskflow.types import futures
from taskflow.utils import eventlet_utils as eu
_DONE_STATES = frozenset([ _DONE_STATES = frozenset([
@@ -94,7 +94,8 @@ def _partition_futures(fs):
def _wait_for_any_green(fs, timeout=None): def _wait_for_any_green(fs, timeout=None):
assert EVENTLET_AVAILABLE, 'eventlet is needed to wait on green futures' eu.check_for_eventlet(RuntimeError('Eventlet is needed to wait on'
' green futures'))
with _base._AcquireFutures(fs): with _base._AcquireFutures(fs):
done, not_done = _partition_futures(fs) done, not_done = _partition_futures(fs)

View File

@@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
try:
import eventlet as _eventlet # noqa
EVENTLET_AVAILABLE = True
except ImportError:
EVENTLET_AVAILABLE = False
def check_for_eventlet(exc=None):
"""Check if eventlet is available and if not raise a runtime error.
:param exc: exception to raise instead of raising a runtime error
:type exc: exception
"""
if not EVENTLET_AVAILABLE:
if exc is None:
raise RuntimeError('Eventlet is not current available')
else:
raise exc