diff --git a/doc/source/types.rst b/doc/source/types.rst index 57cebaeae..84d446acf 100644 --- a/doc/source/types.rst +++ b/doc/source/types.rst @@ -29,11 +29,6 @@ FSM .. automodule:: taskflow.types.fsm -Futures -======= - -.. automodule:: taskflow.types.futures - Graph ===== @@ -45,11 +40,6 @@ Notifier .. automodule:: taskflow.types.notifier :special-members: __call__ -Periodic -======== - -.. automodule:: taskflow.types.periodic - Sets ==== diff --git a/requirements.txt b/requirements.txt index d4abcdd02..24414c6d7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,9 @@ six>=1.9.0 # Enum library made for <= python 3.3 enum34;python_version=='2.7' or python_version=='2.6' +# For async and/or periodic work +futurist>=0.1.1 # Apache-2.0 + # For reader/writer + interprocess locks. fasteners>=0.7 # Apache-2.0 diff --git a/taskflow/engines/action_engine/actions/retry.py b/taskflow/engines/action_engine/actions/retry.py index c8cad50ad..a3834d151 100644 --- a/taskflow/engines/action_engine/actions/retry.py +++ b/taskflow/engines/action_engine/actions/retry.py @@ -14,13 +14,14 @@ # License for the specific language governing permissions and limitations # under the License. +import futurist + from taskflow.engines.action_engine.actions import base from taskflow.engines.action_engine import executor as ex from taskflow import logging from taskflow import retry as retry_atom from taskflow import states from taskflow.types import failure -from taskflow.types import futures LOG = logging.getLogger(__name__) @@ -46,7 +47,7 @@ class RetryAction(base.Action): def __init__(self, storage, notifier): super(RetryAction, self).__init__(storage, notifier) - self._executor = futures.SynchronousExecutor() + self._executor = futurist.SynchronousExecutor() def _get_retry_args(self, retry, addons=None): arguments = self._storage.fetch_mapped_args( diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 9da9ae9db..11bc1d384 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -364,7 +364,7 @@ String (case insensitive) Executor used # # NOTE(harlowja): the reason we use the library/built-in futures is to # allow for instances of that to be detected and handled correctly, instead - # of forcing everyone to use our derivatives... + # of forcing everyone to use our derivatives (futurist or other)... _executor_cls_matchers = [ _ExecutorTypeMatch((futures.ThreadPoolExecutor,), executor.ParallelThreadTaskExecutor), diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 2f8ddb099..6f2e8b82d 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -21,6 +21,7 @@ import os import pickle import threading +import futurist from oslo_utils import excutils from oslo_utils import reflection from oslo_utils import timeutils @@ -31,7 +32,6 @@ from six.moves import queue as compat_queue from taskflow import logging from taskflow import task as task_atom from taskflow.types import failure -from taskflow.types import futures from taskflow.types import notifier from taskflow.types import timing from taskflow.utils import async_utils @@ -357,7 +357,7 @@ class SerialTaskExecutor(TaskExecutor): """Executes tasks one after another.""" def __init__(self): - self._executor = futures.SynchronousExecutor() + self._executor = futurist.SynchronousExecutor() def start(self): self._executor.restart() @@ -429,7 +429,7 @@ class ParallelThreadTaskExecutor(ParallelTaskExecutor): """Executes tasks in parallel using a thread pool executor.""" def _create_executor(self, max_workers=None): - return futures.ThreadPoolExecutor(max_workers=max_workers) + return futurist.ThreadPoolExecutor(max_workers=max_workers) class ParallelProcessTaskExecutor(ParallelTaskExecutor): @@ -459,7 +459,7 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor): self._queue = None def _create_executor(self, max_workers=None): - return futures.ProcessPoolExecutor(max_workers=max_workers) + return futurist.ProcessPoolExecutor(max_workers=max_workers) def start(self): if threading_utils.is_alive(self._worker): diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index b5b7d39b0..6ad86d7bb 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -16,6 +16,7 @@ import functools +from futurist import periodics from oslo_utils import timeutils from taskflow.engines.action_engine import executor @@ -26,7 +27,6 @@ from taskflow.engines.worker_based import types as wt from taskflow import exceptions as exc from taskflow import logging from taskflow import task as task_atom -from taskflow.types import periodic from taskflow.utils import kombu_utils as ku from taskflow.utils import misc from taskflow.utils import threading_utils as tu @@ -67,7 +67,7 @@ class WorkerTaskExecutor(executor.TaskExecutor): self._helpers.bind(lambda: tu.daemon_thread(self._proxy.start), after_start=lambda t: self._proxy.wait(), before_join=lambda t: self._proxy.stop()) - p_worker = periodic.PeriodicWorker.create([self._finder]) + p_worker = periodics.PeriodicWorker.create([self._finder]) if p_worker: self._helpers.bind(lambda: tu.daemon_thread(p_worker.start), before_join=lambda t: p_worker.stop(), diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index cbb61ebe5..823f83c97 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -18,8 +18,8 @@ import abc import collections import threading -from concurrent import futures import fasteners +import futurist from oslo_utils import reflection from oslo_utils import timeutils import six @@ -243,7 +243,7 @@ class Request(Message): self._state = WAITING self._lock = threading.Lock() self._created_on = timeutils.utcnow() - self._result = futures.Future() + self._result = futurist.Future() self._result.atom = task self._notifier = task.notifier diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py index 1ee8f4b86..5d212e5cf 100644 --- a/taskflow/engines/worker_based/types.py +++ b/taskflow/engines/worker_based/types.py @@ -20,6 +20,7 @@ import itertools import random import threading +from futurist import periodics from oslo_utils import reflection import six @@ -28,7 +29,6 @@ from taskflow.engines.worker_based import protocol as pr from taskflow import logging from taskflow.types import cache as base from taskflow.types import notifier -from taskflow.types import periodic from taskflow.types import timing as tt from taskflow.utils import kombu_utils as ku @@ -180,7 +180,7 @@ class ProxyWorkerFinder(WorkerFinder): else: return TopicWorker(topic, tasks) - @periodic.periodic(pr.NOTIFY_PERIOD) + @periodics.periodic(pr.NOTIFY_PERIOD, run_immediately=True) def beat(self): """Cyclically called to publish notify message to each topic.""" self._proxy.publish(pr.Notify(), self._topics, reply_to=self._uuid) diff --git a/taskflow/engines/worker_based/worker.py b/taskflow/engines/worker_based/worker.py index 8a79133f6..a462c7f3d 100644 --- a/taskflow/engines/worker_based/worker.py +++ b/taskflow/engines/worker_based/worker.py @@ -20,13 +20,13 @@ import socket import string import sys +import futurist from oslo_utils import reflection from taskflow.engines.worker_based import endpoint from taskflow.engines.worker_based import server from taskflow import logging from taskflow import task as t_task -from taskflow.types import futures from taskflow.utils import misc from taskflow.utils import threading_utils as tu from taskflow import version @@ -99,7 +99,7 @@ System details: self._executor = executor self._owns_executor = False if self._executor is None: - self._executor = futures.ThreadPoolExecutor( + self._executor = futurist.ThreadPoolExecutor( max_workers=threads_count) self._owns_executor = True self._endpoints = self._derive_endpoints(tasks) diff --git a/taskflow/examples/hello_world.py b/taskflow/examples/hello_world.py index caba5275f..38a6b3870 100644 --- a/taskflow/examples/hello_world.py +++ b/taskflow/examples/hello_world.py @@ -25,11 +25,12 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) sys.path.insert(0, top_dir) +import futurist + from taskflow import engines from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf from taskflow import task -from taskflow.types import futures from taskflow.utils import eventlet_utils @@ -82,19 +83,19 @@ song.add(PrinterTask("conductor@begin", # Run in parallel using eventlet green threads... if eventlet_utils.EVENTLET_AVAILABLE: - with futures.GreenThreadPoolExecutor() as executor: + with futurist.GreenThreadPoolExecutor() as executor: e = engines.load(song, executor=executor, engine='parallel') e.run() # Run in parallel using real threads... -with futures.ThreadPoolExecutor(max_workers=1) as executor: +with futurist.ThreadPoolExecutor(max_workers=1) as executor: e = engines.load(song, executor=executor, engine='parallel') e.run() # Run in parallel using external processes... -with futures.ProcessPoolExecutor(max_workers=1) as executor: +with futurist.ProcessPoolExecutor(max_workers=1) as executor: e = engines.load(song, executor=executor, engine='parallel') e.run() diff --git a/taskflow/examples/parallel_table_multiply.py b/taskflow/examples/parallel_table_multiply.py index f4550c20c..e06e36d8f 100644 --- a/taskflow/examples/parallel_table_multiply.py +++ b/taskflow/examples/parallel_table_multiply.py @@ -27,12 +27,12 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) sys.path.insert(0, top_dir) +import futurist from six.moves import range as compat_range from taskflow import engines from taskflow.patterns import unordered_flow as uf from taskflow import task -from taskflow.types import futures from taskflow.utils import eventlet_utils # INTRO: This example walks through a miniature workflow which does a parallel @@ -98,9 +98,9 @@ def main(): # Now run it (using the specified executor)... if eventlet_utils.EVENTLET_AVAILABLE: - executor = futures.GreenThreadPoolExecutor(max_workers=5) + executor = futurist.GreenThreadPoolExecutor(max_workers=5) else: - executor = futures.ThreadPoolExecutor(max_workers=5) + executor = futurist.ThreadPoolExecutor(max_workers=5) try: e = engines.load(f, engine='parallel', executor=executor) for st in e.run_iter(): diff --git a/taskflow/examples/resume_vm_boot.py b/taskflow/examples/resume_vm_boot.py index 8c7d4aeab..ec2293bfe 100644 --- a/taskflow/examples/resume_vm_boot.py +++ b/taskflow/examples/resume_vm_boot.py @@ -31,6 +31,7 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), sys.path.insert(0, top_dir) sys.path.insert(0, self_dir) +import futurist from oslo_utils import uuidutils from taskflow import engines @@ -38,7 +39,6 @@ from taskflow import exceptions as exc from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow import task -from taskflow.types import futures from taskflow.utils import eventlet_utils from taskflow.utils import persistence_utils as p_utils @@ -239,7 +239,7 @@ with eu.get_backend() as backend: # Set up how we want our engine to run, serial, parallel... executor = None if eventlet_utils.EVENTLET_AVAILABLE: - executor = futures.GreenThreadPoolExecutor(5) + executor = futurist.GreenThreadPoolExecutor(5) # Create/fetch a logbook that will track the workflows work. book = None diff --git a/taskflow/examples/share_engine_thread.py b/taskflow/examples/share_engine_thread.py index 5654fa066..5223721b1 100644 --- a/taskflow/examples/share_engine_thread.py +++ b/taskflow/examples/share_engine_thread.py @@ -27,12 +27,12 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) sys.path.insert(0, top_dir) +import futurist import six from taskflow import engines from taskflow.patterns import unordered_flow as uf from taskflow import task -from taskflow.types import futures from taskflow.utils import threading_utils as tu # INTRO: in this example we create 2 dummy flow(s) with a 2 dummy task(s), and @@ -61,7 +61,7 @@ f2.add(DelayedTask("f2-1")) f2.add(DelayedTask("f2-2")) # Run them all using the same futures (thread-pool based) executor... -with futures.ThreadPoolExecutor() as ex: +with futurist.ThreadPoolExecutor() as ex: e1 = engines.load(f1, engine='parallel', executor=ex) e2 = engines.load(f2, engine='parallel', executor=ex) iters = [e1.run_iter(), e2.run_iter()] diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 15b310340..9df706e76 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -20,8 +20,8 @@ import functools import sys import threading -from concurrent import futures import fasteners +import futurist from kazoo import exceptions as k_exceptions from kazoo.protocol import paths as k_paths from kazoo.recipe import watchers @@ -736,7 +736,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): if self._conf.get('check_compatible', True): kazoo_utils.check_compatible(self._client, self.MIN_ZK_VERSION) if self._worker is None and self._emit_notifications: - self._worker = futures.ThreadPoolExecutor(max_workers=1) + self._worker = futurist.ThreadPoolExecutor(max_workers=1) self._client.ensure_path(self.path) self._client.ensure_path(self.trash_path) if self._job_watcher is None: diff --git a/taskflow/tests/unit/action_engine/test_creation.py b/taskflow/tests/unit/action_engine/test_creation.py index 2c6ed585f..a2390993f 100644 --- a/taskflow/tests/unit/action_engine/test_creation.py +++ b/taskflow/tests/unit/action_engine/test_creation.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import futurist import testtools from taskflow.engines.action_engine import engine @@ -22,7 +23,6 @@ from taskflow.patterns import linear_flow as lf from taskflow.persistence import backends from taskflow import test from taskflow.tests import utils -from taskflow.types import futures as futures from taskflow.utils import eventlet_utils as eu from taskflow.utils import persistence_utils as pu @@ -50,26 +50,26 @@ class ParallelCreationTest(test.TestCase): executor.ParallelProcessTaskExecutor) def test_thread_executor_creation(self): - with futures.ThreadPoolExecutor(1) as e: + with futurist.ThreadPoolExecutor(1) as e: eng = self._create_engine(executor=e) self.assertIsInstance(eng._task_executor, executor.ParallelThreadTaskExecutor) def test_process_executor_creation(self): - with futures.ProcessPoolExecutor(1) as e: + with futurist.ProcessPoolExecutor(1) as e: eng = self._create_engine(executor=e) self.assertIsInstance(eng._task_executor, executor.ParallelProcessTaskExecutor) @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') def test_green_executor_creation(self): - with futures.GreenThreadPoolExecutor(1) as e: + with futurist.GreenThreadPoolExecutor(1) as e: eng = self._create_engine(executor=e) self.assertIsInstance(eng._task_executor, executor.ParallelThreadTaskExecutor) def test_sync_executor_creation(self): - with futures.SynchronousExecutor() as e: + with futurist.SynchronousExecutor() as e: eng = self._create_engine(executor=e) self.assertIsInstance(eng._task_executor, executor.ParallelThreadTaskExecutor) diff --git a/taskflow/tests/unit/test_arguments_passing.py b/taskflow/tests/unit/test_arguments_passing.py index a1ba2ac80..2cde1dea5 100644 --- a/taskflow/tests/unit/test_arguments_passing.py +++ b/taskflow/tests/unit/test_arguments_passing.py @@ -14,13 +14,13 @@ # License for the specific language governing permissions and limitations # under the License. +import futurist import testtools import taskflow.engines from taskflow import exceptions as exc from taskflow import test from taskflow.tests import utils -from taskflow.types import futures from taskflow.utils import eventlet_utils as eu @@ -192,7 +192,7 @@ class ParallelEngineWithEventletTest(ArgumentsPassingTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): if executor is None: - executor = futures.GreenThreadPoolExecutor() + executor = futurist.GreenThreadPoolExecutor() self.addCleanup(executor.shutdown) return taskflow.engines.load(flow, flow_detail=flow_detail, diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index ed073e6a4..093fe9b9b 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -17,6 +17,7 @@ import contextlib import functools +import futurist import six import testtools @@ -34,7 +35,6 @@ from taskflow import task from taskflow import test from taskflow.tests import utils from taskflow.types import failure -from taskflow.types import futures from taskflow.types import graph as gr from taskflow.utils import eventlet_utils as eu from taskflow.utils import persistence_utils as p_utils @@ -977,7 +977,7 @@ class ParallelEngineWithThreadsTest(EngineTaskTest, def test_using_common_executor(self): flow = utils.TaskNoRequiresNoReturns(name='task1') - executor = futures.ThreadPoolExecutor(self._EXECUTOR_WORKERS) + executor = futurist.ThreadPoolExecutor(self._EXECUTOR_WORKERS) try: e1 = self._make_engine(flow, executor=executor) e2 = self._make_engine(flow, executor=executor) @@ -1002,7 +1002,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest, def _make_engine(self, flow, flow_detail=None, executor=None, store=None): if executor is None: - executor = futures.GreenThreadPoolExecutor() + executor = futurist.GreenThreadPoolExecutor() self.addCleanup(executor.shutdown) return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, engine='parallel', diff --git a/taskflow/tests/unit/test_futures.py b/taskflow/tests/unit/test_futures.py deleted file mode 100644 index ce2c69c14..000000000 --- a/taskflow/tests/unit/test_futures.py +++ /dev/null @@ -1,229 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections -import functools -import threading -import time - -import testtools - -from taskflow import test -from taskflow.types import futures -from taskflow.utils import eventlet_utils as eu - -try: - from eventlet.green import threading as greenthreading - from eventlet.green import time as greentime -except ImportError: - pass - - -def _noop(): - pass - - -def _blowup(): - raise IOError("Broke!") - - -def _return_given(given): - return given - - -def _return_one(): - return 1 - - -def _double(x): - return x * 2 - - -class _SimpleFuturesTestMixin(object): - # This exists to test basic functionality, mainly required to test the - # process executor which has a very restricted set of things it can - # execute (no lambda functions, no instance methods...) - def _make_executor(self, max_workers): - raise NotImplementedError("Not implemented") - - def test_invalid_workers(self): - self.assertRaises(ValueError, self._make_executor, -1) - self.assertRaises(ValueError, self._make_executor, 0) - - def test_exception_transfer(self): - with self._make_executor(2) as e: - f = e.submit(_blowup) - self.assertRaises(IOError, f.result) - self.assertEqual(1, e.statistics.failures) - - def test_accumulator(self): - created = [] - with self._make_executor(5) as e: - for _i in range(0, 10): - created.append(e.submit(_return_one)) - results = [f.result() for f in created] - self.assertEqual(10, sum(results)) - self.assertEqual(10, e.statistics.executed) - - def test_map(self): - count = [i for i in range(0, 100)] - with self._make_executor(5) as e: - results = list(e.map(_double, count)) - initial = sum(count) - self.assertEqual(2 * initial, sum(results)) - - def test_alive(self): - e = self._make_executor(1) - self.assertTrue(e.alive) - e.shutdown() - self.assertFalse(e.alive) - with self._make_executor(1) as e2: - self.assertTrue(e2.alive) - self.assertFalse(e2.alive) - - -class _FuturesTestMixin(_SimpleFuturesTestMixin): - def _delay(self, secs): - raise NotImplementedError("Not implemented") - - def _make_lock(self): - raise NotImplementedError("Not implemented") - - def _make_funcs(self, called, amount): - mutator = self._make_lock() - - def store_call(ident): - with mutator: - called[ident] += 1 - - for i in range(0, amount): - yield functools.partial(store_call, ident=i) - - def test_func_calls(self): - called = collections.defaultdict(int) - - with self._make_executor(2) as e: - for f in self._make_funcs(called, 2): - e.submit(f) - - self.assertEqual(1, called[0]) - self.assertEqual(1, called[1]) - self.assertEqual(2, e.statistics.executed) - - def test_result_callback(self): - called = collections.defaultdict(int) - mutator = self._make_lock() - - def callback(future): - with mutator: - called[future] += 1 - - funcs = list(self._make_funcs(called, 1)) - with self._make_executor(2) as e: - for func in funcs: - f = e.submit(func) - f.add_done_callback(callback) - - self.assertEqual(2, len(called)) - - def test_result_transfer(self): - create_am = 50 - with self._make_executor(2) as e: - fs = [] - for i in range(0, create_am): - fs.append(e.submit(functools.partial(_return_given, i))) - self.assertEqual(create_am, len(fs)) - self.assertEqual(create_am, e.statistics.executed) - for i in range(0, create_am): - result = fs[i].result() - self.assertEqual(i, result) - - def test_called_restricted_size(self): - create_am = 100 - called = collections.defaultdict(int) - - with self._make_executor(1) as e: - for f in self._make_funcs(called, create_am): - e.submit(f) - - self.assertFalse(e.alive) - self.assertEqual(create_am, len(called)) - self.assertEqual(create_am, e.statistics.executed) - - -class ThreadPoolExecutorTest(test.TestCase, _FuturesTestMixin): - def _make_executor(self, max_workers): - return futures.ThreadPoolExecutor(max_workers=max_workers) - - def _delay(self, secs): - time.sleep(secs) - - def _make_lock(self): - return threading.Lock() - - -class ProcessPoolExecutorTest(test.TestCase, _SimpleFuturesTestMixin): - def _make_executor(self, max_workers): - return futures.ProcessPoolExecutor(max_workers=max_workers) - - -class SynchronousExecutorTest(test.TestCase, _FuturesTestMixin): - def _make_executor(self, max_workers): - return futures.SynchronousExecutor() - - def _delay(self, secs): - time.sleep(secs) - - def _make_lock(self): - return threading.Lock() - - def test_invalid_workers(self): - pass - - -@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') -class GreenThreadPoolExecutorTest(test.TestCase, _FuturesTestMixin): - def _make_executor(self, max_workers): - return futures.GreenThreadPoolExecutor(max_workers=max_workers) - - def _delay(self, secs): - greentime.sleep(secs) - - def _make_lock(self): - return greenthreading.Lock() - - def test_cancellation(self): - called = collections.defaultdict(int) - - fs = [] - with self._make_executor(2) as e: - for func in self._make_funcs(called, 2): - fs.append(e.submit(func)) - # Greenthreads don't start executing until we wait for them - # to, since nothing here does IO, this will work out correctly. - # - # If something here did a blocking call, then eventlet could swap - # one of the executors threads in, but nothing in this test does. - for f in fs: - self.assertFalse(f.running()) - f.cancel() - - self.assertEqual(0, len(called)) - self.assertEqual(2, len(fs)) - self.assertEqual(2, e.statistics.cancelled) - for f in fs: - self.assertTrue(f.cancelled()) - self.assertTrue(f.done()) diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index ddb256b0e..330c13a72 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import futurist import testtools import taskflow.engines @@ -26,7 +27,6 @@ from taskflow import states as st from taskflow import test from taskflow.tests import utils from taskflow.types import failure -from taskflow.types import futures from taskflow.utils import eventlet_utils as eu @@ -1236,7 +1236,7 @@ class ParallelEngineWithEventletTest(RetryTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): if executor is None: - executor = futures.GreenThreadPoolExecutor() + executor = futurist.GreenThreadPoolExecutor() self.addCleanup(executor.shutdown) return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, engine='parallel', diff --git a/taskflow/tests/unit/test_suspend.py b/taskflow/tests/unit/test_suspend.py index e5d0288f6..5d11eddd2 100644 --- a/taskflow/tests/unit/test_suspend.py +++ b/taskflow/tests/unit/test_suspend.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import futurist import testtools import taskflow.engines @@ -22,7 +23,6 @@ from taskflow.patterns import linear_flow as lf from taskflow import states from taskflow import test from taskflow.tests import utils -from taskflow.types import futures from taskflow.utils import eventlet_utils as eu @@ -217,7 +217,7 @@ class ParallelEngineWithEventletTest(SuspendTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): if executor is None: - executor = futures.GreenThreadPoolExecutor() + executor = futurist.GreenThreadPoolExecutor() self.addCleanup(executor.shutdown) return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, engine='parallel', diff --git a/taskflow/tests/unit/test_types.py b/taskflow/tests/unit/test_types.py index 809c1926a..8980aa5c5 100644 --- a/taskflow/tests/unit/test_types.py +++ b/taskflow/tests/unit/test_types.py @@ -14,8 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import time - import networkx as nx import six from six.moves import cPickle as pickle @@ -24,31 +22,9 @@ from taskflow import exceptions as excp from taskflow import test from taskflow.types import fsm from taskflow.types import graph -from taskflow.types import latch -from taskflow.types import periodic from taskflow.types import sets from taskflow.types import table from taskflow.types import tree -from taskflow.utils import threading_utils as tu - - -class PeriodicThingy(object): - def __init__(self): - self.capture = [] - - @periodic.periodic(0.01) - def a(self): - self.capture.append('a') - - @periodic.periodic(0.02) - def b(self): - self.capture.append('b') - - def c(self): - pass - - def d(self): - pass class GraphTest(test.TestCase): @@ -629,110 +605,3 @@ class OrderedSetTest(test.TestCase): es3 = set(s3) self.assertEqual(es.union(es2, es3), s.union(s2, s3)) - - -class PeriodicTest(test.TestCase): - - def test_invalid_periodic(self): - - def no_op(): - pass - - self.assertRaises(ValueError, periodic.periodic, -1) - - def test_valid_periodic(self): - - @periodic.periodic(2) - def no_op(): - pass - - self.assertTrue(getattr(no_op, '_periodic')) - self.assertEqual(2, getattr(no_op, '_periodic_spacing')) - self.assertEqual(True, getattr(no_op, '_periodic_run_immediately')) - - def test_scanning_periodic(self): - p = PeriodicThingy() - w = periodic.PeriodicWorker.create([p]) - self.assertEqual(2, len(w)) - - t = tu.daemon_thread(target=w.start) - t.start() - time.sleep(0.1) - w.stop() - t.join() - - b_calls = [c for c in p.capture if c == 'b'] - self.assertGreater(0, len(b_calls)) - a_calls = [c for c in p.capture if c == 'a'] - self.assertGreater(0, len(a_calls)) - - def test_periodic_single(self): - barrier = latch.Latch(5) - capture = [] - - @periodic.periodic(0.01) - def callee(): - barrier.countdown() - if barrier.needed == 0: - w.stop() - capture.append(1) - - w = periodic.PeriodicWorker([callee]) - t = tu.daemon_thread(target=w.start) - t.start() - t.join() - - self.assertEqual(0, barrier.needed) - self.assertEqual(5, sum(capture)) - - def test_immediate(self): - capture = [] - - @periodic.periodic(120, run_immediately=True) - def a(): - capture.append('a') - - w = periodic.PeriodicWorker([a]) - t = tu.daemon_thread(target=w.start) - t.start() - time.sleep(0.1) - w.stop() - t.join() - - a_calls = [c for c in capture if c == 'a'] - self.assertGreater(0, len(a_calls)) - - def test_period_double_no_immediate(self): - capture = [] - - @periodic.periodic(0.01, run_immediately=False) - def a(): - capture.append('a') - - @periodic.periodic(0.02, run_immediately=False) - def b(): - capture.append('b') - - w = periodic.PeriodicWorker([a, b]) - t = tu.daemon_thread(target=w.start) - t.start() - time.sleep(0.1) - w.stop() - t.join() - - b_calls = [c for c in capture if c == 'b'] - self.assertGreater(0, len(b_calls)) - a_calls = [c for c in capture if c == 'a'] - self.assertGreater(0, len(a_calls)) - - def test_start_nothing_error(self): - w = periodic.PeriodicWorker([]) - self.assertRaises(RuntimeError, w.start) - - def test_missing_function_attrs(self): - - def fake_periodic(): - pass - - cb = fake_periodic - self.assertRaises(ValueError, periodic.PeriodicWorker, [cb]) diff --git a/taskflow/tests/unit/test_utils_async_utils.py b/taskflow/tests/unit/test_utils_async_utils.py index 642be96e7..1f8b01195 100644 --- a/taskflow/tests/unit/test_utils_async_utils.py +++ b/taskflow/tests/unit/test_utils_async_utils.py @@ -14,10 +14,10 @@ # License for the specific language governing permissions and limitations # under the License. +import futurist import testtools from taskflow import test -from taskflow.types import futures from taskflow.utils import async_utils as au from taskflow.utils import eventlet_utils as eu @@ -37,14 +37,14 @@ class WaitForAnyTestsMixin(object): self.assertTrue(any(f in done for f in fs)) def test_not_done_futures(self): - fs = [futures.Future(), futures.Future()] + fs = [futurist.Future(), futurist.Future()] done, not_done = au.wait_for_any(fs, self.timeout) self.assertEqual(len(done), 0) self.assertEqual(len(not_done), 2) def test_mixed_futures(self): - f1 = futures.Future() - f2 = futures.Future() + f1 = futurist.Future() + f2 = futurist.Future() f2.set_result(1) done, not_done = au.wait_for_any([f1, f2], self.timeout) self.assertEqual(len(done), 1) @@ -57,13 +57,13 @@ class WaitForAnyTestsMixin(object): class AsyncUtilsEventletTest(test.TestCase, WaitForAnyTestsMixin): def _make_executor(self, max_workers): - return futures.GreenThreadPoolExecutor(max_workers=max_workers) + return futurist.GreenThreadPoolExecutor(max_workers=max_workers) class AsyncUtilsThreadedTest(test.TestCase, WaitForAnyTestsMixin): def _make_executor(self, max_workers): - return futures.ThreadPoolExecutor(max_workers=max_workers) + return futurist.ThreadPoolExecutor(max_workers=max_workers) class MakeCompletedFutureTest(test.TestCase): @@ -78,4 +78,4 @@ class MakeCompletedFutureTest(test.TestCase): class AsyncUtilsSynchronousTest(test.TestCase, WaitForAnyTestsMixin): def _make_executor(self, max_workers): - return futures.SynchronousExecutor() + return futurist.SynchronousExecutor() diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index d944b64bf..e6c21eb4a 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -17,7 +17,7 @@ import threading import time -from concurrent import futures +import futurist from oslo_utils import timeutils from taskflow.engines.worker_based import executor @@ -281,7 +281,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.assertEqual(expected_calls, self.master_mock.mock_calls) def test_wait_for_any(self): - fs = [futures.Future(), futures.Future()] + fs = [futurist.Future(), futurist.Future()] ex = self.executor() ex.wait_for_any(fs) @@ -292,7 +292,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): def test_wait_for_any_with_timeout(self): timeout = 30 - fs = [futures.Future(), futures.Future()] + fs = [futurist.Future(), futurist.Future()] ex = self.executor() ex.wait_for_any(fs, timeout) diff --git a/taskflow/tests/unit/worker_based/test_pipeline.py b/taskflow/tests/unit/worker_based/test_pipeline.py index 8d4de7f5b..3030b8318 100644 --- a/taskflow/tests/unit/worker_based/test_pipeline.py +++ b/taskflow/tests/unit/worker_based/test_pipeline.py @@ -14,7 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. -from concurrent import futures +import futurist from oslo_utils import uuidutils from taskflow.engines.action_engine import executor as base_executor @@ -39,7 +39,7 @@ class TestPipeline(test.TestCase): endpoints.append(endpoint.Endpoint(cls)) server = worker_server.Server( TEST_TOPIC, TEST_EXCHANGE, - futures.ThreadPoolExecutor(1), endpoints, + futurist.ThreadPoolExecutor(max_workers=1), endpoints, transport='memory', transport_options={ 'polling_interval': POLLING_INTERVAL, diff --git a/taskflow/tests/unit/worker_based/test_protocol.py b/taskflow/tests/unit/worker_based/test_protocol.py index 73fe9c8c9..4647ae8ce 100644 --- a/taskflow/tests/unit/worker_based/test_protocol.py +++ b/taskflow/tests/unit/worker_based/test_protocol.py @@ -14,7 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. -from concurrent import futures +import futurist from oslo_utils import uuidutils from taskflow.engines.action_engine import executor @@ -135,7 +135,7 @@ class TestProtocol(test.TestCase): request = self.request() self.assertEqual(request.uuid, self.task_uuid) self.assertEqual(request.task, self.task) - self.assertIsInstance(request.result, futures.Future) + self.assertIsInstance(request.result, futurist.Future) self.assertFalse(request.result.done()) def test_to_dict_default(self): diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py index 3acf245b2..f4a02aead 100644 --- a/taskflow/tests/unit/worker_based/test_worker.py +++ b/taskflow/tests/unit/worker_based/test_worker.py @@ -37,7 +37,7 @@ class TestWorker(test.MockTestCase): # patch classes self.executor_mock, self.executor_inst_mock = self.patchClass( - worker.futures, 'ThreadPoolExecutor', attach_as='executor') + worker.futurist, 'ThreadPoolExecutor', attach_as='executor') self.server_mock, self.server_inst_mock = self.patchClass( worker.server, 'Server') diff --git a/taskflow/types/futures.py b/taskflow/types/futures.py deleted file mode 100644 index da79fc8ed..000000000 --- a/taskflow/types/futures.py +++ /dev/null @@ -1,444 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import functools -import sys -import threading - -from concurrent import futures as _futures -from concurrent.futures import process as _process -from concurrent.futures import thread as _thread -from oslo_utils import importutils -from oslo_utils import reflection -import six - -greenpatcher = importutils.try_import('eventlet.patcher') -greenpool = importutils.try_import('eventlet.greenpool') -greenqueue = importutils.try_import('eventlet.queue') -greenthreading = importutils.try_import('eventlet.green.threading') - -from taskflow.types import timing -from taskflow.utils import eventlet_utils as eu -from taskflow.utils import threading_utils as tu - - -# NOTE(harlowja): Allows for simpler access to this type... -Future = _futures.Future - - -class _Gatherer(object): - def __init__(self, submit_func, - lock_cls=threading.Lock, start_before_submit=False): - self._submit_func = submit_func - self._stats_lock = lock_cls() - self._stats = ExecutorStatistics() - self._start_before_submit = start_before_submit - - @property - def statistics(self): - return self._stats - - def clear(self): - with self._stats_lock: - self._stats = ExecutorStatistics() - - def _capture_stats(self, watch, fut): - """Capture statistics - - :param watch: stopwatch object - :param fut: future object - """ - watch.stop() - with self._stats_lock: - # Use a new collection and lock so that all mutations are seen as - # atomic and not overlapping and corrupting with other - # mutations (the clone ensures that others reading the current - # values will not see a mutated/corrupted one). Since futures may - # be completed by different threads we need to be extra careful to - # gather this data in a way that is thread-safe... - (failures, executed, runtime, cancelled) = (self._stats.failures, - self._stats.executed, - self._stats.runtime, - self._stats.cancelled) - if fut.cancelled(): - cancelled += 1 - else: - executed += 1 - if fut.exception() is not None: - failures += 1 - runtime += watch.elapsed() - self._stats = ExecutorStatistics(failures=failures, - executed=executed, - runtime=runtime, - cancelled=cancelled) - - def submit(self, fn, *args, **kwargs): - """Submit work to be executed and capture statistics.""" - watch = timing.StopWatch() - if self._start_before_submit: - watch.start() - fut = self._submit_func(fn, *args, **kwargs) - if not self._start_before_submit: - watch.start() - fut.add_done_callback(functools.partial(self._capture_stats, watch)) - return fut - - -class ThreadPoolExecutor(_thread.ThreadPoolExecutor): - """Executor that uses a thread pool to execute calls asynchronously. - - It gathers statistics about the submissions executed for post-analysis... - - See: https://docs.python.org/dev/library/concurrent.futures.html - """ - def __init__(self, max_workers=None): - if max_workers is None: - max_workers = tu.get_optimal_thread_count() - super(ThreadPoolExecutor, self).__init__(max_workers=max_workers) - if self._max_workers <= 0: - raise ValueError("Max workers must be greater than zero") - self._gatherer = _Gatherer( - # Since our submit will use this gatherer we have to reference - # the parent submit, bound to this instance (which is what we - # really want to use anyway). - super(ThreadPoolExecutor, self).submit) - - @property - def max_workers(self): - """The max number of workers that this executor will ever have.""" - return self._max_workers - - @property - def statistics(self): - """:class:`.ExecutorStatistics` about the executors executions.""" - return self._gatherer.statistics - - @property - def alive(self): - """Accessor to determine if the executor is alive/active.""" - return not self._shutdown - - def submit(self, fn, *args, **kwargs): - """Submit some work to be executed (and gather statistics).""" - return self._gatherer.submit(fn, *args, **kwargs) - - -class ProcessPoolExecutor(_process.ProcessPoolExecutor): - """Executor that uses a process pool to execute calls asynchronously. - - It gathers statistics about the submissions executed for post-analysis... - - See: https://docs.python.org/dev/library/concurrent.futures.html - """ - def __init__(self, max_workers=None): - if max_workers is None: - max_workers = tu.get_optimal_thread_count() - super(ProcessPoolExecutor, self).__init__(max_workers=max_workers) - if self._max_workers <= 0: - raise ValueError("Max workers must be greater than zero") - self._gatherer = _Gatherer( - # Since our submit will use this gatherer we have to reference - # the parent submit, bound to this instance (which is what we - # really want to use anyway). - super(ProcessPoolExecutor, self).submit) - - @property - def alive(self): - """Accessor to determine if the executor is alive/active.""" - return not self._shutdown_thread - - @property - def max_workers(self): - """The max number of workers that this executor will ever have.""" - return self._max_workers - - @property - def statistics(self): - """:class:`.ExecutorStatistics` about the executors executions.""" - return self._gatherer.statistics - - def submit(self, fn, *args, **kwargs): - """Submit some work to be executed (and gather statistics).""" - return self._gatherer.submit(fn, *args, **kwargs) - - -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs - - def run(self): - if not self.future.set_running_or_notify_cancel(): - return - try: - result = self.fn(*self.args, **self.kwargs) - except BaseException: - exc_type, exc_value, exc_tb = sys.exc_info() - try: - if six.PY2: - self.future.set_exception_info(exc_value, exc_tb) - else: - self.future.set_exception(exc_value) - finally: - del(exc_type, exc_value, exc_tb) - else: - self.future.set_result(result) - - -class SynchronousExecutor(_futures.Executor): - """Executor that uses the caller to execute calls synchronously. - - This provides an interface to a caller that looks like an executor but - will execute the calls inside the caller thread instead of executing it - in a external process/thread for when this type of functionality is - useful to provide... - - It gathers statistics about the submissions executed for post-analysis... - """ - - def __init__(self): - self._shutoff = False - self._gatherer = _Gatherer(self._submit, - start_before_submit=True) - - @property - def alive(self): - """Accessor to determine if the executor is alive/active.""" - return not self._shutoff - - def shutdown(self, wait=True): - self._shutoff = True - - def restart(self): - """Restarts this executor (*iff* previously shutoff/shutdown). - - NOTE(harlowja): clears any previously gathered statistics. - """ - if self._shutoff: - self._shutoff = False - self._gatherer.clear() - - @property - def statistics(self): - """:class:`.ExecutorStatistics` about the executors executions.""" - return self._gatherer.statistics - - def submit(self, fn, *args, **kwargs): - """Submit some work to be executed (and gather statistics).""" - if self._shutoff: - raise RuntimeError('Can not schedule new futures' - ' after being shutdown') - return self._gatherer.submit(fn, *args, **kwargs) - - def _submit(self, fn, *args, **kwargs): - f = Future() - runner = _WorkItem(f, fn, args, kwargs) - runner.run() - return f - - -class _GreenWorker(object): - def __init__(self, executor, work, work_queue): - self.executor = executor - self.work = work - self.work_queue = work_queue - - def __call__(self): - # Run our main piece of work. - try: - self.work.run() - finally: - # Consume any delayed work before finishing (this is how we finish - # work that was to big for the pool size, but needs to be finished - # no matter). - while True: - try: - w = self.work_queue.get_nowait() - except greenqueue.Empty: - break - else: - try: - w.run() - finally: - self.work_queue.task_done() - - -class GreenFuture(Future): - def __init__(self): - super(GreenFuture, self).__init__() - eu.check_for_eventlet(RuntimeError('Eventlet is needed to use a green' - ' future')) - # NOTE(harlowja): replace the built-in condition with a greenthread - # compatible one so that when getting the result of this future the - # functions will correctly yield to eventlet. If this is not done then - # waiting on the future never actually causes the greenthreads to run - # and thus you wait for infinity. - if not greenpatcher.is_monkey_patched('threading'): - self._condition = greenthreading.Condition() - - -class GreenThreadPoolExecutor(_futures.Executor): - """Executor that uses a green thread pool to execute calls asynchronously. - - See: https://docs.python.org/dev/library/concurrent.futures.html - and http://eventlet.net/doc/modules/greenpool.html for information on - how this works. - - It gathers statistics about the submissions executed for post-analysis... - """ - - def __init__(self, max_workers=1000): - eu.check_for_eventlet(RuntimeError('Eventlet is needed to use a green' - ' executor')) - if max_workers <= 0: - raise ValueError("Max workers must be greater than zero") - self._max_workers = max_workers - self._pool = greenpool.GreenPool(self._max_workers) - self._delayed_work = greenqueue.Queue() - self._shutdown_lock = greenthreading.Lock() - self._shutdown = False - self._gatherer = _Gatherer(self._submit, - lock_cls=greenthreading.Lock) - - @property - def alive(self): - """Accessor to determine if the executor is alive/active.""" - return not self._shutdown - - @property - def statistics(self): - """:class:`.ExecutorStatistics` about the executors executions.""" - return self._gatherer.statistics - - def submit(self, fn, *args, **kwargs): - """Submit some work to be executed (and gather statistics). - - :param args: non-keyworded arguments - :type args: list - :param kwargs: key-value arguments - :type kwargs: dictionary - """ - with self._shutdown_lock: - if self._shutdown: - raise RuntimeError('Can not schedule new futures' - ' after being shutdown') - return self._gatherer.submit(fn, *args, **kwargs) - - def _submit(self, fn, *args, **kwargs): - f = GreenFuture() - work = _WorkItem(f, fn, args, kwargs) - if not self._spin_up(work): - self._delayed_work.put(work) - return f - - def _spin_up(self, work): - """Spin up a greenworker if less than max_workers. - - :param work: work to be given to the greenworker - :returns: whether a green worker was spun up or not - :rtype: boolean - """ - alive = self._pool.running() + self._pool.waiting() - if alive < self._max_workers: - self._pool.spawn_n(_GreenWorker(self, work, self._delayed_work)) - return True - return False - - def shutdown(self, wait=True): - with self._shutdown_lock: - if not self._shutdown: - self._shutdown = True - shutoff = True - else: - shutoff = False - if wait and shutoff: - self._pool.waitall() - self._delayed_work.join() - - -class ExecutorStatistics(object): - """Holds *immutable* information about a executors executions.""" - - __slots__ = ['_failures', '_executed', '_runtime', '_cancelled'] - - __repr_format = ("failures=%(failures)s, executed=%(executed)s, " - "runtime=%(runtime)s, cancelled=%(cancelled)s") - - def __init__(self, failures=0, executed=0, runtime=0.0, cancelled=0): - self._failures = failures - self._executed = executed - self._runtime = runtime - self._cancelled = cancelled - - @property - def failures(self): - """How many submissions ended up raising exceptions. - - :returns: how many submissions ended up raising exceptions - :rtype: number - """ - return self._failures - - @property - def executed(self): - """How many submissions were executed (failed or not). - - :returns: how many submissions were executed - :rtype: number - """ - return self._executed - - @property - def runtime(self): - """Total runtime of all submissions executed (failed or not). - - :returns: total runtime of all submissions executed - :rtype: number - """ - return self._runtime - - @property - def cancelled(self): - """How many submissions were cancelled before executing. - - :returns: how many submissions were cancelled before executing - :rtype: number - """ - return self._cancelled - - @property - def average_runtime(self): - """The average runtime of all submissions executed. - - :returns: average runtime of all submissions executed - :rtype: number - :raises: ZeroDivisionError when no executions have occurred. - """ - return self._runtime / self._executed - - def __repr__(self): - r = reflection.get_class_name(self, fully_qualified=False) - r += "(" - r += self.__repr_format % ({ - 'failures': self._failures, - 'executed': self._executed, - 'runtime': self._runtime, - 'cancelled': self._cancelled, - }) - r += ")" - return r diff --git a/taskflow/types/periodic.py b/taskflow/types/periodic.py deleted file mode 100644 index 872326da1..000000000 --- a/taskflow/types/periodic.py +++ /dev/null @@ -1,209 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import heapq -import inspect -import threading - -from debtcollector import removals -import monotonic -from oslo_utils import reflection -import six - -from taskflow import logging -from taskflow.utils import misc - -LOG = logging.getLogger(__name__) - - -def _check_attrs(obj): - """Checks that a periodic function/method has all the expected attributes. - - This will return the expected attributes that were **not** found. - """ - missing_attrs = [] - for a in ('_periodic', '_periodic_spacing', '_periodic_run_immediately'): - if not hasattr(obj, a): - missing_attrs.append(a) - return missing_attrs - - -def periodic(spacing, run_immediately=True): - """Tags a method/function as wanting/able to execute periodically. - - :param run_immediately: option to specify whether to run - immediately or not - :type run_immediately: boolean - """ - - if spacing <= 0: - raise ValueError("Periodicity/spacing must be greater than" - " zero instead of %s" % spacing) - - def wrapper(f): - f._periodic = True - f._periodic_spacing = spacing - f._periodic_run_immediately = run_immediately - - @six.wraps(f) - def decorator(*args, **kwargs): - return f(*args, **kwargs) - - return decorator - - return wrapper - - -class _Schedule(object): - """Internal heap-based structure that maintains the schedule/ordering.""" - - def __init__(self): - self._ordering = [] - - def push(self, next_run, index): - heapq.heappush(self._ordering, (next_run, index)) - - def push_next(self, cb, index, now=None): - if now is None: - now = monotonic.monotonic() - self.push(now + cb._periodic_spacing, index) - - def __len__(self): - return len(self._ordering) - - def pop(self): - return heapq.heappop(self._ordering) - - -def _build(callables): - schedule = _Schedule() - now = None - immediates = [] - # Reverse order is used since these are later popped off (and to - # ensure the popping order is first -> last we need to append them - # in the opposite ordering last -> first). - for i, cb in misc.reverse_enumerate(callables): - if cb._periodic_run_immediately: - immediates.append(i) - else: - if now is None: - now = monotonic.monotonic() - schedule.push_next(cb, i, now=now) - return immediates, schedule - - -def _safe_call(cb, kind): - try: - cb() - except Exception: - LOG.warn("Failed to call %s '%r'", kind, cb, exc_info=True) - - -class PeriodicWorker(object): - """Calls a collection of callables periodically (sleeping as needed...). - - NOTE(harlowja): typically the :py:meth:`.start` method is executed in a - background thread so that the periodic callables are executed in - the background/asynchronously (using the defined periods to determine - when each is called). - """ - - @classmethod - def create(cls, objects, exclude_hidden=True): - """Automatically creates a worker by analyzing object(s) methods. - - Only picks up methods that have been tagged/decorated with - the :py:func:`.periodic` decorator (does not match against private - or protected methods unless explicitly requested to). - """ - callables = [] - for obj in objects: - for (name, member) in inspect.getmembers(obj): - if name.startswith("_") and exclude_hidden: - continue - if reflection.is_bound_method(member): - missing_attrs = _check_attrs(member) - if not missing_attrs: - callables.append(member) - return cls(callables) - - @removals.removed_kwarg('tombstone', version="0.8", removal_version="?") - def __init__(self, callables, tombstone=None): - if tombstone is None: - self._tombstone = threading.Event() - else: - self._tombstone = tombstone - self._callables = [] - for i, cb in enumerate(callables, 1): - if not six.callable(cb): - raise ValueError("Periodic callback %s must be callable" % i) - missing_attrs = _check_attrs(cb) - if missing_attrs: - raise ValueError("Periodic callback %s missing required" - " attributes %s" % (i, missing_attrs)) - if cb._periodic: - self._callables.append(cb) - self._immediates, self._schedule = _build(self._callables) - - def __len__(self): - return len(self._callables) - - def start(self): - """Starts running (will not return until :py:meth:`.stop` is called). - - NOTE(harlowja): If this worker has no contained callables this raises - a runtime error and does not run since it is impossible to periodically - run nothing. - """ - if not self._callables: - raise RuntimeError("A periodic worker can not start" - " without any callables") - while not self._tombstone.is_set(): - if self._immediates: - # Run & schedule its next execution. - index = self._immediates.pop() - cb = self._callables[index] - LOG.blather("Calling immediate '%r'", cb) - _safe_call(cb, 'immediate') - self._schedule.push_next(cb, index) - else: - # Figure out when we should run next (by selecting the - # minimum item from the heap, where the minimum should be - # the callable that needs to run next and has the lowest - # next desired run time). - now = monotonic.monotonic() - next_run, index = self._schedule.pop() - when_next = next_run - now - if when_next <= 0: - # Run & schedule its next execution. - cb = self._callables[index] - LOG.blather("Calling periodic '%r' (it runs every" - " %s seconds)", cb, cb._periodic_spacing) - _safe_call(cb, 'periodic') - self._schedule.push_next(cb, index, now=now) - else: - # Gotta wait... - self._schedule.push(next_run, index) - self._tombstone.wait(when_next) - - def stop(self): - """Sets the tombstone (this stops any further executions).""" - self._tombstone.set() - - def reset(self): - """Resets the tombstone and re-queues up any immediate executions.""" - self._tombstone.clear() - self._immediates, self._schedule = _build(self._callables) diff --git a/taskflow/utils/async_utils.py b/taskflow/utils/async_utils.py index aec62abf0..c4b114b8d 100644 --- a/taskflow/utils/async_utils.py +++ b/taskflow/utils/async_utils.py @@ -16,11 +16,11 @@ from concurrent import futures as _futures from concurrent.futures import _base +import futurist from oslo_utils import importutils greenthreading = importutils.try_import('eventlet.green.threading') -from taskflow.types import futures from taskflow.utils import eventlet_utils as eu @@ -32,7 +32,7 @@ _DONE_STATES = frozenset([ def make_completed_future(result): """Make and return a future completed with a given result.""" - future = futures.Future() + future = futurist.Future() future.set_result(result) return future @@ -47,7 +47,10 @@ def wait_for_any(fs, timeout=None): Returns pair (done futures, not done futures). """ - green_fs = sum(1 for f in fs if isinstance(f, futures.GreenFuture)) + # TODO(harlowja): remove this when + # https://review.openstack.org/#/c/196269/ is merged and is made + # available. + green_fs = sum(1 for f in fs if isinstance(f, futurist.GreenFuture)) if not green_fs: return _futures.wait(fs, timeout=timeout, diff --git a/taskflow/utils/threading_utils.py b/taskflow/utils/threading_utils.py index e859fffa5..7de0151d8 100644 --- a/taskflow/utils/threading_utils.py +++ b/taskflow/utils/threading_utils.py @@ -15,7 +15,6 @@ # under the License. import collections -import multiprocessing import threading import six @@ -36,17 +35,6 @@ def get_ident(): return _thread.get_ident() -def get_optimal_thread_count(): - """Try to guess optimal thread count for current system.""" - try: - return multiprocessing.cpu_count() + 1 - except NotImplementedError: - # NOTE(harlowja): apparently may raise so in this case we will - # just setup two threads since it's hard to know what else we - # should do in this situation. - return 2 - - def daemon_thread(target, *args, **kwargs): """Makes a daemon thread that calls the given target when started.""" thread = threading.Thread(target=target, args=args, kwargs=kwargs)