Integrate futurist (and **remove** taskflow originating code)

Change-Id: If89baa042695f19e42b6368034f3ccf22c2cf0aa
This commit is contained in:
Joshua Harlow 2015-06-26 14:42:18 -07:00
parent 87c12603eb
commit 27272a2aa7
30 changed files with 68 additions and 1095 deletions

View File

@ -29,11 +29,6 @@ FSM
.. automodule:: taskflow.types.fsm
Futures
=======
.. automodule:: taskflow.types.futures
Graph
=====
@ -45,11 +40,6 @@ Notifier
.. automodule:: taskflow.types.notifier
:special-members: __call__
Periodic
========
.. automodule:: taskflow.types.periodic
Sets
====

View File

@ -13,6 +13,9 @@ six>=1.9.0
# Enum library made for <= python 3.3
enum34;python_version=='2.7' or python_version=='2.6'
# For async and/or periodic work
futurist>=0.1.1 # Apache-2.0
# For reader/writer + interprocess locks.
fasteners>=0.7 # Apache-2.0

View File

@ -14,13 +14,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import futurist
from taskflow.engines.action_engine.actions import base
from taskflow.engines.action_engine import executor as ex
from taskflow import logging
from taskflow import retry as retry_atom
from taskflow import states
from taskflow.types import failure
from taskflow.types import futures
LOG = logging.getLogger(__name__)
@ -46,7 +47,7 @@ class RetryAction(base.Action):
def __init__(self, storage, notifier):
super(RetryAction, self).__init__(storage, notifier)
self._executor = futures.SynchronousExecutor()
self._executor = futurist.SynchronousExecutor()
def _get_retry_args(self, retry, addons=None):
arguments = self._storage.fetch_mapped_args(

View File

@ -364,7 +364,7 @@ String (case insensitive) Executor used
#
# NOTE(harlowja): the reason we use the library/built-in futures is to
# allow for instances of that to be detected and handled correctly, instead
# of forcing everyone to use our derivatives...
# of forcing everyone to use our derivatives (futurist or other)...
_executor_cls_matchers = [
_ExecutorTypeMatch((futures.ThreadPoolExecutor,),
executor.ParallelThreadTaskExecutor),

View File

@ -21,6 +21,7 @@ import os
import pickle
import threading
import futurist
from oslo_utils import excutils
from oslo_utils import reflection
from oslo_utils import timeutils
@ -31,7 +32,6 @@ from six.moves import queue as compat_queue
from taskflow import logging
from taskflow import task as task_atom
from taskflow.types import failure
from taskflow.types import futures
from taskflow.types import notifier
from taskflow.types import timing
from taskflow.utils import async_utils
@ -357,7 +357,7 @@ class SerialTaskExecutor(TaskExecutor):
"""Executes tasks one after another."""
def __init__(self):
self._executor = futures.SynchronousExecutor()
self._executor = futurist.SynchronousExecutor()
def start(self):
self._executor.restart()
@ -429,7 +429,7 @@ class ParallelThreadTaskExecutor(ParallelTaskExecutor):
"""Executes tasks in parallel using a thread pool executor."""
def _create_executor(self, max_workers=None):
return futures.ThreadPoolExecutor(max_workers=max_workers)
return futurist.ThreadPoolExecutor(max_workers=max_workers)
class ParallelProcessTaskExecutor(ParallelTaskExecutor):
@ -459,7 +459,7 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor):
self._queue = None
def _create_executor(self, max_workers=None):
return futures.ProcessPoolExecutor(max_workers=max_workers)
return futurist.ProcessPoolExecutor(max_workers=max_workers)
def start(self):
if threading_utils.is_alive(self._worker):

View File

@ -16,6 +16,7 @@
import functools
from futurist import periodics
from oslo_utils import timeutils
from taskflow.engines.action_engine import executor
@ -26,7 +27,6 @@ from taskflow.engines.worker_based import types as wt
from taskflow import exceptions as exc
from taskflow import logging
from taskflow import task as task_atom
from taskflow.types import periodic
from taskflow.utils import kombu_utils as ku
from taskflow.utils import misc
from taskflow.utils import threading_utils as tu
@ -67,7 +67,7 @@ class WorkerTaskExecutor(executor.TaskExecutor):
self._helpers.bind(lambda: tu.daemon_thread(self._proxy.start),
after_start=lambda t: self._proxy.wait(),
before_join=lambda t: self._proxy.stop())
p_worker = periodic.PeriodicWorker.create([self._finder])
p_worker = periodics.PeriodicWorker.create([self._finder])
if p_worker:
self._helpers.bind(lambda: tu.daemon_thread(p_worker.start),
before_join=lambda t: p_worker.stop(),

View File

@ -18,8 +18,8 @@ import abc
import collections
import threading
from concurrent import futures
import fasteners
import futurist
from oslo_utils import reflection
from oslo_utils import timeutils
import six
@ -243,7 +243,7 @@ class Request(Message):
self._state = WAITING
self._lock = threading.Lock()
self._created_on = timeutils.utcnow()
self._result = futures.Future()
self._result = futurist.Future()
self._result.atom = task
self._notifier = task.notifier

View File

@ -20,6 +20,7 @@ import itertools
import random
import threading
from futurist import periodics
from oslo_utils import reflection
import six
@ -28,7 +29,6 @@ from taskflow.engines.worker_based import protocol as pr
from taskflow import logging
from taskflow.types import cache as base
from taskflow.types import notifier
from taskflow.types import periodic
from taskflow.types import timing as tt
from taskflow.utils import kombu_utils as ku
@ -180,7 +180,7 @@ class ProxyWorkerFinder(WorkerFinder):
else:
return TopicWorker(topic, tasks)
@periodic.periodic(pr.NOTIFY_PERIOD)
@periodics.periodic(pr.NOTIFY_PERIOD, run_immediately=True)
def beat(self):
"""Cyclically called to publish notify message to each topic."""
self._proxy.publish(pr.Notify(), self._topics, reply_to=self._uuid)

View File

@ -20,13 +20,13 @@ import socket
import string
import sys
import futurist
from oslo_utils import reflection
from taskflow.engines.worker_based import endpoint
from taskflow.engines.worker_based import server
from taskflow import logging
from taskflow import task as t_task
from taskflow.types import futures
from taskflow.utils import misc
from taskflow.utils import threading_utils as tu
from taskflow import version
@ -99,7 +99,7 @@ System details:
self._executor = executor
self._owns_executor = False
if self._executor is None:
self._executor = futures.ThreadPoolExecutor(
self._executor = futurist.ThreadPoolExecutor(
max_workers=threads_count)
self._owns_executor = True
self._endpoints = self._derive_endpoints(tasks)

View File

@ -25,11 +25,12 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir))
sys.path.insert(0, top_dir)
import futurist
from taskflow import engines
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
from taskflow import task
from taskflow.types import futures
from taskflow.utils import eventlet_utils
@ -82,19 +83,19 @@ song.add(PrinterTask("conductor@begin",
# Run in parallel using eventlet green threads...
if eventlet_utils.EVENTLET_AVAILABLE:
with futures.GreenThreadPoolExecutor() as executor:
with futurist.GreenThreadPoolExecutor() as executor:
e = engines.load(song, executor=executor, engine='parallel')
e.run()
# Run in parallel using real threads...
with futures.ThreadPoolExecutor(max_workers=1) as executor:
with futurist.ThreadPoolExecutor(max_workers=1) as executor:
e = engines.load(song, executor=executor, engine='parallel')
e.run()
# Run in parallel using external processes...
with futures.ProcessPoolExecutor(max_workers=1) as executor:
with futurist.ProcessPoolExecutor(max_workers=1) as executor:
e = engines.load(song, executor=executor, engine='parallel')
e.run()

View File

@ -27,12 +27,12 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir))
sys.path.insert(0, top_dir)
import futurist
from six.moves import range as compat_range
from taskflow import engines
from taskflow.patterns import unordered_flow as uf
from taskflow import task
from taskflow.types import futures
from taskflow.utils import eventlet_utils
# INTRO: This example walks through a miniature workflow which does a parallel
@ -98,9 +98,9 @@ def main():
# Now run it (using the specified executor)...
if eventlet_utils.EVENTLET_AVAILABLE:
executor = futures.GreenThreadPoolExecutor(max_workers=5)
executor = futurist.GreenThreadPoolExecutor(max_workers=5)
else:
executor = futures.ThreadPoolExecutor(max_workers=5)
executor = futurist.ThreadPoolExecutor(max_workers=5)
try:
e = engines.load(f, engine='parallel', executor=executor)
for st in e.run_iter():

View File

@ -31,6 +31,7 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)
import futurist
from oslo_utils import uuidutils
from taskflow import engines
@ -38,7 +39,6 @@ from taskflow import exceptions as exc
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow import task
from taskflow.types import futures
from taskflow.utils import eventlet_utils
from taskflow.utils import persistence_utils as p_utils
@ -239,7 +239,7 @@ with eu.get_backend() as backend:
# Set up how we want our engine to run, serial, parallel...
executor = None
if eventlet_utils.EVENTLET_AVAILABLE:
executor = futures.GreenThreadPoolExecutor(5)
executor = futurist.GreenThreadPoolExecutor(5)
# Create/fetch a logbook that will track the workflows work.
book = None

View File

@ -27,12 +27,12 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir))
sys.path.insert(0, top_dir)
import futurist
import six
from taskflow import engines
from taskflow.patterns import unordered_flow as uf
from taskflow import task
from taskflow.types import futures
from taskflow.utils import threading_utils as tu
# INTRO: in this example we create 2 dummy flow(s) with a 2 dummy task(s), and
@ -61,7 +61,7 @@ f2.add(DelayedTask("f2-1"))
f2.add(DelayedTask("f2-2"))
# Run them all using the same futures (thread-pool based) executor...
with futures.ThreadPoolExecutor() as ex:
with futurist.ThreadPoolExecutor() as ex:
e1 = engines.load(f1, engine='parallel', executor=ex)
e2 = engines.load(f2, engine='parallel', executor=ex)
iters = [e1.run_iter(), e2.run_iter()]

View File

@ -20,8 +20,8 @@ import functools
import sys
import threading
from concurrent import futures
import fasteners
import futurist
from kazoo import exceptions as k_exceptions
from kazoo.protocol import paths as k_paths
from kazoo.recipe import watchers
@ -736,7 +736,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
if self._conf.get('check_compatible', True):
kazoo_utils.check_compatible(self._client, self.MIN_ZK_VERSION)
if self._worker is None and self._emit_notifications:
self._worker = futures.ThreadPoolExecutor(max_workers=1)
self._worker = futurist.ThreadPoolExecutor(max_workers=1)
self._client.ensure_path(self.path)
self._client.ensure_path(self.trash_path)
if self._job_watcher is None:

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import futurist
import testtools
from taskflow.engines.action_engine import engine
@ -22,7 +23,6 @@ from taskflow.patterns import linear_flow as lf
from taskflow.persistence import backends
from taskflow import test
from taskflow.tests import utils
from taskflow.types import futures as futures
from taskflow.utils import eventlet_utils as eu
from taskflow.utils import persistence_utils as pu
@ -50,26 +50,26 @@ class ParallelCreationTest(test.TestCase):
executor.ParallelProcessTaskExecutor)
def test_thread_executor_creation(self):
with futures.ThreadPoolExecutor(1) as e:
with futurist.ThreadPoolExecutor(1) as e:
eng = self._create_engine(executor=e)
self.assertIsInstance(eng._task_executor,
executor.ParallelThreadTaskExecutor)
def test_process_executor_creation(self):
with futures.ProcessPoolExecutor(1) as e:
with futurist.ProcessPoolExecutor(1) as e:
eng = self._create_engine(executor=e)
self.assertIsInstance(eng._task_executor,
executor.ParallelProcessTaskExecutor)
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
def test_green_executor_creation(self):
with futures.GreenThreadPoolExecutor(1) as e:
with futurist.GreenThreadPoolExecutor(1) as e:
eng = self._create_engine(executor=e)
self.assertIsInstance(eng._task_executor,
executor.ParallelThreadTaskExecutor)
def test_sync_executor_creation(self):
with futures.SynchronousExecutor() as e:
with futurist.SynchronousExecutor() as e:
eng = self._create_engine(executor=e)
self.assertIsInstance(eng._task_executor,
executor.ParallelThreadTaskExecutor)

View File

@ -14,13 +14,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import futurist
import testtools
import taskflow.engines
from taskflow import exceptions as exc
from taskflow import test
from taskflow.tests import utils
from taskflow.types import futures
from taskflow.utils import eventlet_utils as eu
@ -192,7 +192,7 @@ class ParallelEngineWithEventletTest(ArgumentsPassingTest, test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None:
executor = futures.GreenThreadPoolExecutor()
executor = futurist.GreenThreadPoolExecutor()
self.addCleanup(executor.shutdown)
return taskflow.engines.load(flow,
flow_detail=flow_detail,

View File

@ -17,6 +17,7 @@
import contextlib
import functools
import futurist
import six
import testtools
@ -34,7 +35,6 @@ from taskflow import task
from taskflow import test
from taskflow.tests import utils
from taskflow.types import failure
from taskflow.types import futures
from taskflow.types import graph as gr
from taskflow.utils import eventlet_utils as eu
from taskflow.utils import persistence_utils as p_utils
@ -977,7 +977,7 @@ class ParallelEngineWithThreadsTest(EngineTaskTest,
def test_using_common_executor(self):
flow = utils.TaskNoRequiresNoReturns(name='task1')
executor = futures.ThreadPoolExecutor(self._EXECUTOR_WORKERS)
executor = futurist.ThreadPoolExecutor(self._EXECUTOR_WORKERS)
try:
e1 = self._make_engine(flow, executor=executor)
e2 = self._make_engine(flow, executor=executor)
@ -1002,7 +1002,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest,
def _make_engine(self, flow,
flow_detail=None, executor=None, store=None):
if executor is None:
executor = futures.GreenThreadPoolExecutor()
executor = futurist.GreenThreadPoolExecutor()
self.addCleanup(executor.shutdown)
return taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend, engine='parallel',

View File

@ -1,229 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import functools
import threading
import time
import testtools
from taskflow import test
from taskflow.types import futures
from taskflow.utils import eventlet_utils as eu
try:
from eventlet.green import threading as greenthreading
from eventlet.green import time as greentime
except ImportError:
pass
def _noop():
pass
def _blowup():
raise IOError("Broke!")
def _return_given(given):
return given
def _return_one():
return 1
def _double(x):
return x * 2
class _SimpleFuturesTestMixin(object):
# This exists to test basic functionality, mainly required to test the
# process executor which has a very restricted set of things it can
# execute (no lambda functions, no instance methods...)
def _make_executor(self, max_workers):
raise NotImplementedError("Not implemented")
def test_invalid_workers(self):
self.assertRaises(ValueError, self._make_executor, -1)
self.assertRaises(ValueError, self._make_executor, 0)
def test_exception_transfer(self):
with self._make_executor(2) as e:
f = e.submit(_blowup)
self.assertRaises(IOError, f.result)
self.assertEqual(1, e.statistics.failures)
def test_accumulator(self):
created = []
with self._make_executor(5) as e:
for _i in range(0, 10):
created.append(e.submit(_return_one))
results = [f.result() for f in created]
self.assertEqual(10, sum(results))
self.assertEqual(10, e.statistics.executed)
def test_map(self):
count = [i for i in range(0, 100)]
with self._make_executor(5) as e:
results = list(e.map(_double, count))
initial = sum(count)
self.assertEqual(2 * initial, sum(results))
def test_alive(self):
e = self._make_executor(1)
self.assertTrue(e.alive)
e.shutdown()
self.assertFalse(e.alive)
with self._make_executor(1) as e2:
self.assertTrue(e2.alive)
self.assertFalse(e2.alive)
class _FuturesTestMixin(_SimpleFuturesTestMixin):
def _delay(self, secs):
raise NotImplementedError("Not implemented")
def _make_lock(self):
raise NotImplementedError("Not implemented")
def _make_funcs(self, called, amount):
mutator = self._make_lock()
def store_call(ident):
with mutator:
called[ident] += 1
for i in range(0, amount):
yield functools.partial(store_call, ident=i)
def test_func_calls(self):
called = collections.defaultdict(int)
with self._make_executor(2) as e:
for f in self._make_funcs(called, 2):
e.submit(f)
self.assertEqual(1, called[0])
self.assertEqual(1, called[1])
self.assertEqual(2, e.statistics.executed)
def test_result_callback(self):
called = collections.defaultdict(int)
mutator = self._make_lock()
def callback(future):
with mutator:
called[future] += 1
funcs = list(self._make_funcs(called, 1))
with self._make_executor(2) as e:
for func in funcs:
f = e.submit(func)
f.add_done_callback(callback)
self.assertEqual(2, len(called))
def test_result_transfer(self):
create_am = 50
with self._make_executor(2) as e:
fs = []
for i in range(0, create_am):
fs.append(e.submit(functools.partial(_return_given, i)))
self.assertEqual(create_am, len(fs))
self.assertEqual(create_am, e.statistics.executed)
for i in range(0, create_am):
result = fs[i].result()
self.assertEqual(i, result)
def test_called_restricted_size(self):
create_am = 100
called = collections.defaultdict(int)
with self._make_executor(1) as e:
for f in self._make_funcs(called, create_am):
e.submit(f)
self.assertFalse(e.alive)
self.assertEqual(create_am, len(called))
self.assertEqual(create_am, e.statistics.executed)
class ThreadPoolExecutorTest(test.TestCase, _FuturesTestMixin):
def _make_executor(self, max_workers):
return futures.ThreadPoolExecutor(max_workers=max_workers)
def _delay(self, secs):
time.sleep(secs)
def _make_lock(self):
return threading.Lock()
class ProcessPoolExecutorTest(test.TestCase, _SimpleFuturesTestMixin):
def _make_executor(self, max_workers):
return futures.ProcessPoolExecutor(max_workers=max_workers)
class SynchronousExecutorTest(test.TestCase, _FuturesTestMixin):
def _make_executor(self, max_workers):
return futures.SynchronousExecutor()
def _delay(self, secs):
time.sleep(secs)
def _make_lock(self):
return threading.Lock()
def test_invalid_workers(self):
pass
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
class GreenThreadPoolExecutorTest(test.TestCase, _FuturesTestMixin):
def _make_executor(self, max_workers):
return futures.GreenThreadPoolExecutor(max_workers=max_workers)
def _delay(self, secs):
greentime.sleep(secs)
def _make_lock(self):
return greenthreading.Lock()
def test_cancellation(self):
called = collections.defaultdict(int)
fs = []
with self._make_executor(2) as e:
for func in self._make_funcs(called, 2):
fs.append(e.submit(func))
# Greenthreads don't start executing until we wait for them
# to, since nothing here does IO, this will work out correctly.
#
# If something here did a blocking call, then eventlet could swap
# one of the executors threads in, but nothing in this test does.
for f in fs:
self.assertFalse(f.running())
f.cancel()
self.assertEqual(0, len(called))
self.assertEqual(2, len(fs))
self.assertEqual(2, e.statistics.cancelled)
for f in fs:
self.assertTrue(f.cancelled())
self.assertTrue(f.done())

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import futurist
import testtools
import taskflow.engines
@ -26,7 +27,6 @@ from taskflow import states as st
from taskflow import test
from taskflow.tests import utils
from taskflow.types import failure
from taskflow.types import futures
from taskflow.utils import eventlet_utils as eu
@ -1236,7 +1236,7 @@ class ParallelEngineWithEventletTest(RetryTest, test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None:
executor = futures.GreenThreadPoolExecutor()
executor = futurist.GreenThreadPoolExecutor()
self.addCleanup(executor.shutdown)
return taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend, engine='parallel',

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import futurist
import testtools
import taskflow.engines
@ -22,7 +23,6 @@ from taskflow.patterns import linear_flow as lf
from taskflow import states
from taskflow import test
from taskflow.tests import utils
from taskflow.types import futures
from taskflow.utils import eventlet_utils as eu
@ -217,7 +217,7 @@ class ParallelEngineWithEventletTest(SuspendTest, test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None:
executor = futures.GreenThreadPoolExecutor()
executor = futurist.GreenThreadPoolExecutor()
self.addCleanup(executor.shutdown)
return taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend, engine='parallel',

View File

@ -14,8 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
import networkx as nx
import six
from six.moves import cPickle as pickle
@ -24,31 +22,9 @@ from taskflow import exceptions as excp
from taskflow import test
from taskflow.types import fsm
from taskflow.types import graph
from taskflow.types import latch
from taskflow.types import periodic
from taskflow.types import sets
from taskflow.types import table
from taskflow.types import tree
from taskflow.utils import threading_utils as tu
class PeriodicThingy(object):
def __init__(self):
self.capture = []
@periodic.periodic(0.01)
def a(self):
self.capture.append('a')
@periodic.periodic(0.02)
def b(self):
self.capture.append('b')
def c(self):
pass
def d(self):
pass
class GraphTest(test.TestCase):
@ -629,110 +605,3 @@ class OrderedSetTest(test.TestCase):
es3 = set(s3)
self.assertEqual(es.union(es2, es3), s.union(s2, s3))
class PeriodicTest(test.TestCase):
def test_invalid_periodic(self):
def no_op():
pass
self.assertRaises(ValueError, periodic.periodic, -1)
def test_valid_periodic(self):
@periodic.periodic(2)
def no_op():
pass
self.assertTrue(getattr(no_op, '_periodic'))
self.assertEqual(2, getattr(no_op, '_periodic_spacing'))
self.assertEqual(True, getattr(no_op, '_periodic_run_immediately'))
def test_scanning_periodic(self):
p = PeriodicThingy()
w = periodic.PeriodicWorker.create([p])
self.assertEqual(2, len(w))
t = tu.daemon_thread(target=w.start)
t.start()
time.sleep(0.1)
w.stop()
t.join()
b_calls = [c for c in p.capture if c == 'b']
self.assertGreater(0, len(b_calls))
a_calls = [c for c in p.capture if c == 'a']
self.assertGreater(0, len(a_calls))
def test_periodic_single(self):
barrier = latch.Latch(5)
capture = []
@periodic.periodic(0.01)
def callee():
barrier.countdown()
if barrier.needed == 0:
w.stop()
capture.append(1)
w = periodic.PeriodicWorker([callee])
t = tu.daemon_thread(target=w.start)
t.start()
t.join()
self.assertEqual(0, barrier.needed)
self.assertEqual(5, sum(capture))
def test_immediate(self):
capture = []
@periodic.periodic(120, run_immediately=True)
def a():
capture.append('a')
w = periodic.PeriodicWorker([a])
t = tu.daemon_thread(target=w.start)
t.start()
time.sleep(0.1)
w.stop()
t.join()
a_calls = [c for c in capture if c == 'a']
self.assertGreater(0, len(a_calls))
def test_period_double_no_immediate(self):
capture = []
@periodic.periodic(0.01, run_immediately=False)
def a():
capture.append('a')
@periodic.periodic(0.02, run_immediately=False)
def b():
capture.append('b')
w = periodic.PeriodicWorker([a, b])
t = tu.daemon_thread(target=w.start)
t.start()
time.sleep(0.1)
w.stop()
t.join()
b_calls = [c for c in capture if c == 'b']
self.assertGreater(0, len(b_calls))
a_calls = [c for c in capture if c == 'a']
self.assertGreater(0, len(a_calls))
def test_start_nothing_error(self):
w = periodic.PeriodicWorker([])
self.assertRaises(RuntimeError, w.start)
def test_missing_function_attrs(self):
def fake_periodic():
pass
cb = fake_periodic
self.assertRaises(ValueError, periodic.PeriodicWorker, [cb])

View File

@ -14,10 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import futurist
import testtools
from taskflow import test
from taskflow.types import futures
from taskflow.utils import async_utils as au
from taskflow.utils import eventlet_utils as eu
@ -37,14 +37,14 @@ class WaitForAnyTestsMixin(object):
self.assertTrue(any(f in done for f in fs))
def test_not_done_futures(self):
fs = [futures.Future(), futures.Future()]
fs = [futurist.Future(), futurist.Future()]
done, not_done = au.wait_for_any(fs, self.timeout)
self.assertEqual(len(done), 0)
self.assertEqual(len(not_done), 2)
def test_mixed_futures(self):
f1 = futures.Future()
f2 = futures.Future()
f1 = futurist.Future()
f2 = futurist.Future()
f2.set_result(1)
done, not_done = au.wait_for_any([f1, f2], self.timeout)
self.assertEqual(len(done), 1)
@ -57,13 +57,13 @@ class WaitForAnyTestsMixin(object):
class AsyncUtilsEventletTest(test.TestCase,
WaitForAnyTestsMixin):
def _make_executor(self, max_workers):
return futures.GreenThreadPoolExecutor(max_workers=max_workers)
return futurist.GreenThreadPoolExecutor(max_workers=max_workers)
class AsyncUtilsThreadedTest(test.TestCase,
WaitForAnyTestsMixin):
def _make_executor(self, max_workers):
return futures.ThreadPoolExecutor(max_workers=max_workers)
return futurist.ThreadPoolExecutor(max_workers=max_workers)
class MakeCompletedFutureTest(test.TestCase):
@ -78,4 +78,4 @@ class MakeCompletedFutureTest(test.TestCase):
class AsyncUtilsSynchronousTest(test.TestCase,
WaitForAnyTestsMixin):
def _make_executor(self, max_workers):
return futures.SynchronousExecutor()
return futurist.SynchronousExecutor()

View File

@ -17,7 +17,7 @@
import threading
import time
from concurrent import futures
import futurist
from oslo_utils import timeutils
from taskflow.engines.worker_based import executor
@ -281,7 +281,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
self.assertEqual(expected_calls, self.master_mock.mock_calls)
def test_wait_for_any(self):
fs = [futures.Future(), futures.Future()]
fs = [futurist.Future(), futurist.Future()]
ex = self.executor()
ex.wait_for_any(fs)
@ -292,7 +292,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
def test_wait_for_any_with_timeout(self):
timeout = 30
fs = [futures.Future(), futures.Future()]
fs = [futurist.Future(), futurist.Future()]
ex = self.executor()
ex.wait_for_any(fs, timeout)

View File

@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from concurrent import futures
import futurist
from oslo_utils import uuidutils
from taskflow.engines.action_engine import executor as base_executor
@ -39,7 +39,7 @@ class TestPipeline(test.TestCase):
endpoints.append(endpoint.Endpoint(cls))
server = worker_server.Server(
TEST_TOPIC, TEST_EXCHANGE,
futures.ThreadPoolExecutor(1), endpoints,
futurist.ThreadPoolExecutor(max_workers=1), endpoints,
transport='memory',
transport_options={
'polling_interval': POLLING_INTERVAL,

View File

@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from concurrent import futures
import futurist
from oslo_utils import uuidutils
from taskflow.engines.action_engine import executor
@ -135,7 +135,7 @@ class TestProtocol(test.TestCase):
request = self.request()
self.assertEqual(request.uuid, self.task_uuid)
self.assertEqual(request.task, self.task)
self.assertIsInstance(request.result, futures.Future)
self.assertIsInstance(request.result, futurist.Future)
self.assertFalse(request.result.done())
def test_to_dict_default(self):

View File

@ -37,7 +37,7 @@ class TestWorker(test.MockTestCase):
# patch classes
self.executor_mock, self.executor_inst_mock = self.patchClass(
worker.futures, 'ThreadPoolExecutor', attach_as='executor')
worker.futurist, 'ThreadPoolExecutor', attach_as='executor')
self.server_mock, self.server_inst_mock = self.patchClass(
worker.server, 'Server')

View File

@ -1,444 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import sys
import threading
from concurrent import futures as _futures
from concurrent.futures import process as _process
from concurrent.futures import thread as _thread
from oslo_utils import importutils
from oslo_utils import reflection
import six
greenpatcher = importutils.try_import('eventlet.patcher')
greenpool = importutils.try_import('eventlet.greenpool')
greenqueue = importutils.try_import('eventlet.queue')
greenthreading = importutils.try_import('eventlet.green.threading')
from taskflow.types import timing
from taskflow.utils import eventlet_utils as eu
from taskflow.utils import threading_utils as tu
# NOTE(harlowja): Allows for simpler access to this type...
Future = _futures.Future
class _Gatherer(object):
def __init__(self, submit_func,
lock_cls=threading.Lock, start_before_submit=False):
self._submit_func = submit_func
self._stats_lock = lock_cls()
self._stats = ExecutorStatistics()
self._start_before_submit = start_before_submit
@property
def statistics(self):
return self._stats
def clear(self):
with self._stats_lock:
self._stats = ExecutorStatistics()
def _capture_stats(self, watch, fut):
"""Capture statistics
:param watch: stopwatch object
:param fut: future object
"""
watch.stop()
with self._stats_lock:
# Use a new collection and lock so that all mutations are seen as
# atomic and not overlapping and corrupting with other
# mutations (the clone ensures that others reading the current
# values will not see a mutated/corrupted one). Since futures may
# be completed by different threads we need to be extra careful to
# gather this data in a way that is thread-safe...
(failures, executed, runtime, cancelled) = (self._stats.failures,
self._stats.executed,
self._stats.runtime,
self._stats.cancelled)
if fut.cancelled():
cancelled += 1
else:
executed += 1
if fut.exception() is not None:
failures += 1
runtime += watch.elapsed()
self._stats = ExecutorStatistics(failures=failures,
executed=executed,
runtime=runtime,
cancelled=cancelled)
def submit(self, fn, *args, **kwargs):
"""Submit work to be executed and capture statistics."""
watch = timing.StopWatch()
if self._start_before_submit:
watch.start()
fut = self._submit_func(fn, *args, **kwargs)
if not self._start_before_submit:
watch.start()
fut.add_done_callback(functools.partial(self._capture_stats, watch))
return fut
class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
"""Executor that uses a thread pool to execute calls asynchronously.
It gathers statistics about the submissions executed for post-analysis...
See: https://docs.python.org/dev/library/concurrent.futures.html
"""
def __init__(self, max_workers=None):
if max_workers is None:
max_workers = tu.get_optimal_thread_count()
super(ThreadPoolExecutor, self).__init__(max_workers=max_workers)
if self._max_workers <= 0:
raise ValueError("Max workers must be greater than zero")
self._gatherer = _Gatherer(
# Since our submit will use this gatherer we have to reference
# the parent submit, bound to this instance (which is what we
# really want to use anyway).
super(ThreadPoolExecutor, self).submit)
@property
def max_workers(self):
"""The max number of workers that this executor will ever have."""
return self._max_workers
@property
def statistics(self):
""":class:`.ExecutorStatistics` about the executors executions."""
return self._gatherer.statistics
@property
def alive(self):
"""Accessor to determine if the executor is alive/active."""
return not self._shutdown
def submit(self, fn, *args, **kwargs):
"""Submit some work to be executed (and gather statistics)."""
return self._gatherer.submit(fn, *args, **kwargs)
class ProcessPoolExecutor(_process.ProcessPoolExecutor):
"""Executor that uses a process pool to execute calls asynchronously.
It gathers statistics about the submissions executed for post-analysis...
See: https://docs.python.org/dev/library/concurrent.futures.html
"""
def __init__(self, max_workers=None):
if max_workers is None:
max_workers = tu.get_optimal_thread_count()
super(ProcessPoolExecutor, self).__init__(max_workers=max_workers)
if self._max_workers <= 0:
raise ValueError("Max workers must be greater than zero")
self._gatherer = _Gatherer(
# Since our submit will use this gatherer we have to reference
# the parent submit, bound to this instance (which is what we
# really want to use anyway).
super(ProcessPoolExecutor, self).submit)
@property
def alive(self):
"""Accessor to determine if the executor is alive/active."""
return not self._shutdown_thread
@property
def max_workers(self):
"""The max number of workers that this executor will ever have."""
return self._max_workers
@property
def statistics(self):
""":class:`.ExecutorStatistics` about the executors executions."""
return self._gatherer.statistics
def submit(self, fn, *args, **kwargs):
"""Submit some work to be executed (and gather statistics)."""
return self._gatherer.submit(fn, *args, **kwargs)
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException:
exc_type, exc_value, exc_tb = sys.exc_info()
try:
if six.PY2:
self.future.set_exception_info(exc_value, exc_tb)
else:
self.future.set_exception(exc_value)
finally:
del(exc_type, exc_value, exc_tb)
else:
self.future.set_result(result)
class SynchronousExecutor(_futures.Executor):
"""Executor that uses the caller to execute calls synchronously.
This provides an interface to a caller that looks like an executor but
will execute the calls inside the caller thread instead of executing it
in a external process/thread for when this type of functionality is
useful to provide...
It gathers statistics about the submissions executed for post-analysis...
"""
def __init__(self):
self._shutoff = False
self._gatherer = _Gatherer(self._submit,
start_before_submit=True)
@property
def alive(self):
"""Accessor to determine if the executor is alive/active."""
return not self._shutoff
def shutdown(self, wait=True):
self._shutoff = True
def restart(self):
"""Restarts this executor (*iff* previously shutoff/shutdown).
NOTE(harlowja): clears any previously gathered statistics.
"""
if self._shutoff:
self._shutoff = False
self._gatherer.clear()
@property
def statistics(self):
""":class:`.ExecutorStatistics` about the executors executions."""
return self._gatherer.statistics
def submit(self, fn, *args, **kwargs):
"""Submit some work to be executed (and gather statistics)."""
if self._shutoff:
raise RuntimeError('Can not schedule new futures'
' after being shutdown')
return self._gatherer.submit(fn, *args, **kwargs)
def _submit(self, fn, *args, **kwargs):
f = Future()
runner = _WorkItem(f, fn, args, kwargs)
runner.run()
return f
class _GreenWorker(object):
def __init__(self, executor, work, work_queue):
self.executor = executor
self.work = work
self.work_queue = work_queue
def __call__(self):
# Run our main piece of work.
try:
self.work.run()
finally:
# Consume any delayed work before finishing (this is how we finish
# work that was to big for the pool size, but needs to be finished
# no matter).
while True:
try:
w = self.work_queue.get_nowait()
except greenqueue.Empty:
break
else:
try:
w.run()
finally:
self.work_queue.task_done()
class GreenFuture(Future):
def __init__(self):
super(GreenFuture, self).__init__()
eu.check_for_eventlet(RuntimeError('Eventlet is needed to use a green'
' future'))
# NOTE(harlowja): replace the built-in condition with a greenthread
# compatible one so that when getting the result of this future the
# functions will correctly yield to eventlet. If this is not done then
# waiting on the future never actually causes the greenthreads to run
# and thus you wait for infinity.
if not greenpatcher.is_monkey_patched('threading'):
self._condition = greenthreading.Condition()
class GreenThreadPoolExecutor(_futures.Executor):
"""Executor that uses a green thread pool to execute calls asynchronously.
See: https://docs.python.org/dev/library/concurrent.futures.html
and http://eventlet.net/doc/modules/greenpool.html for information on
how this works.
It gathers statistics about the submissions executed for post-analysis...
"""
def __init__(self, max_workers=1000):
eu.check_for_eventlet(RuntimeError('Eventlet is needed to use a green'
' executor'))
if max_workers <= 0:
raise ValueError("Max workers must be greater than zero")
self._max_workers = max_workers
self._pool = greenpool.GreenPool(self._max_workers)
self._delayed_work = greenqueue.Queue()
self._shutdown_lock = greenthreading.Lock()
self._shutdown = False
self._gatherer = _Gatherer(self._submit,
lock_cls=greenthreading.Lock)
@property
def alive(self):
"""Accessor to determine if the executor is alive/active."""
return not self._shutdown
@property
def statistics(self):
""":class:`.ExecutorStatistics` about the executors executions."""
return self._gatherer.statistics
def submit(self, fn, *args, **kwargs):
"""Submit some work to be executed (and gather statistics).
:param args: non-keyworded arguments
:type args: list
:param kwargs: key-value arguments
:type kwargs: dictionary
"""
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('Can not schedule new futures'
' after being shutdown')
return self._gatherer.submit(fn, *args, **kwargs)
def _submit(self, fn, *args, **kwargs):
f = GreenFuture()
work = _WorkItem(f, fn, args, kwargs)
if not self._spin_up(work):
self._delayed_work.put(work)
return f
def _spin_up(self, work):
"""Spin up a greenworker if less than max_workers.
:param work: work to be given to the greenworker
:returns: whether a green worker was spun up or not
:rtype: boolean
"""
alive = self._pool.running() + self._pool.waiting()
if alive < self._max_workers:
self._pool.spawn_n(_GreenWorker(self, work, self._delayed_work))
return True
return False
def shutdown(self, wait=True):
with self._shutdown_lock:
if not self._shutdown:
self._shutdown = True
shutoff = True
else:
shutoff = False
if wait and shutoff:
self._pool.waitall()
self._delayed_work.join()
class ExecutorStatistics(object):
"""Holds *immutable* information about a executors executions."""
__slots__ = ['_failures', '_executed', '_runtime', '_cancelled']
__repr_format = ("failures=%(failures)s, executed=%(executed)s, "
"runtime=%(runtime)s, cancelled=%(cancelled)s")
def __init__(self, failures=0, executed=0, runtime=0.0, cancelled=0):
self._failures = failures
self._executed = executed
self._runtime = runtime
self._cancelled = cancelled
@property
def failures(self):
"""How many submissions ended up raising exceptions.
:returns: how many submissions ended up raising exceptions
:rtype: number
"""
return self._failures
@property
def executed(self):
"""How many submissions were executed (failed or not).
:returns: how many submissions were executed
:rtype: number
"""
return self._executed
@property
def runtime(self):
"""Total runtime of all submissions executed (failed or not).
:returns: total runtime of all submissions executed
:rtype: number
"""
return self._runtime
@property
def cancelled(self):
"""How many submissions were cancelled before executing.
:returns: how many submissions were cancelled before executing
:rtype: number
"""
return self._cancelled
@property
def average_runtime(self):
"""The average runtime of all submissions executed.
:returns: average runtime of all submissions executed
:rtype: number
:raises: ZeroDivisionError when no executions have occurred.
"""
return self._runtime / self._executed
def __repr__(self):
r = reflection.get_class_name(self, fully_qualified=False)
r += "("
r += self.__repr_format % ({
'failures': self._failures,
'executed': self._executed,
'runtime': self._runtime,
'cancelled': self._cancelled,
})
r += ")"
return r

View File

@ -1,209 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import heapq
import inspect
import threading
from debtcollector import removals
import monotonic
from oslo_utils import reflection
import six
from taskflow import logging
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
def _check_attrs(obj):
"""Checks that a periodic function/method has all the expected attributes.
This will return the expected attributes that were **not** found.
"""
missing_attrs = []
for a in ('_periodic', '_periodic_spacing', '_periodic_run_immediately'):
if not hasattr(obj, a):
missing_attrs.append(a)
return missing_attrs
def periodic(spacing, run_immediately=True):
"""Tags a method/function as wanting/able to execute periodically.
:param run_immediately: option to specify whether to run
immediately or not
:type run_immediately: boolean
"""
if spacing <= 0:
raise ValueError("Periodicity/spacing must be greater than"
" zero instead of %s" % spacing)
def wrapper(f):
f._periodic = True
f._periodic_spacing = spacing
f._periodic_run_immediately = run_immediately
@six.wraps(f)
def decorator(*args, **kwargs):
return f(*args, **kwargs)
return decorator
return wrapper
class _Schedule(object):
"""Internal heap-based structure that maintains the schedule/ordering."""
def __init__(self):
self._ordering = []
def push(self, next_run, index):
heapq.heappush(self._ordering, (next_run, index))
def push_next(self, cb, index, now=None):
if now is None:
now = monotonic.monotonic()
self.push(now + cb._periodic_spacing, index)
def __len__(self):
return len(self._ordering)
def pop(self):
return heapq.heappop(self._ordering)
def _build(callables):
schedule = _Schedule()
now = None
immediates = []
# Reverse order is used since these are later popped off (and to
# ensure the popping order is first -> last we need to append them
# in the opposite ordering last -> first).
for i, cb in misc.reverse_enumerate(callables):
if cb._periodic_run_immediately:
immediates.append(i)
else:
if now is None:
now = monotonic.monotonic()
schedule.push_next(cb, i, now=now)
return immediates, schedule
def _safe_call(cb, kind):
try:
cb()
except Exception:
LOG.warn("Failed to call %s '%r'", kind, cb, exc_info=True)
class PeriodicWorker(object):
"""Calls a collection of callables periodically (sleeping as needed...).
NOTE(harlowja): typically the :py:meth:`.start` method is executed in a
background thread so that the periodic callables are executed in
the background/asynchronously (using the defined periods to determine
when each is called).
"""
@classmethod
def create(cls, objects, exclude_hidden=True):
"""Automatically creates a worker by analyzing object(s) methods.
Only picks up methods that have been tagged/decorated with
the :py:func:`.periodic` decorator (does not match against private
or protected methods unless explicitly requested to).
"""
callables = []
for obj in objects:
for (name, member) in inspect.getmembers(obj):
if name.startswith("_") and exclude_hidden:
continue
if reflection.is_bound_method(member):
missing_attrs = _check_attrs(member)
if not missing_attrs:
callables.append(member)
return cls(callables)
@removals.removed_kwarg('tombstone', version="0.8", removal_version="?")
def __init__(self, callables, tombstone=None):
if tombstone is None:
self._tombstone = threading.Event()
else:
self._tombstone = tombstone
self._callables = []
for i, cb in enumerate(callables, 1):
if not six.callable(cb):
raise ValueError("Periodic callback %s must be callable" % i)
missing_attrs = _check_attrs(cb)
if missing_attrs:
raise ValueError("Periodic callback %s missing required"
" attributes %s" % (i, missing_attrs))
if cb._periodic:
self._callables.append(cb)
self._immediates, self._schedule = _build(self._callables)
def __len__(self):
return len(self._callables)
def start(self):
"""Starts running (will not return until :py:meth:`.stop` is called).
NOTE(harlowja): If this worker has no contained callables this raises
a runtime error and does not run since it is impossible to periodically
run nothing.
"""
if not self._callables:
raise RuntimeError("A periodic worker can not start"
" without any callables")
while not self._tombstone.is_set():
if self._immediates:
# Run & schedule its next execution.
index = self._immediates.pop()
cb = self._callables[index]
LOG.blather("Calling immediate '%r'", cb)
_safe_call(cb, 'immediate')
self._schedule.push_next(cb, index)
else:
# Figure out when we should run next (by selecting the
# minimum item from the heap, where the minimum should be
# the callable that needs to run next and has the lowest
# next desired run time).
now = monotonic.monotonic()
next_run, index = self._schedule.pop()
when_next = next_run - now
if when_next <= 0:
# Run & schedule its next execution.
cb = self._callables[index]
LOG.blather("Calling periodic '%r' (it runs every"
" %s seconds)", cb, cb._periodic_spacing)
_safe_call(cb, 'periodic')
self._schedule.push_next(cb, index, now=now)
else:
# Gotta wait...
self._schedule.push(next_run, index)
self._tombstone.wait(when_next)
def stop(self):
"""Sets the tombstone (this stops any further executions)."""
self._tombstone.set()
def reset(self):
"""Resets the tombstone and re-queues up any immediate executions."""
self._tombstone.clear()
self._immediates, self._schedule = _build(self._callables)

View File

@ -16,11 +16,11 @@
from concurrent import futures as _futures
from concurrent.futures import _base
import futurist
from oslo_utils import importutils
greenthreading = importutils.try_import('eventlet.green.threading')
from taskflow.types import futures
from taskflow.utils import eventlet_utils as eu
@ -32,7 +32,7 @@ _DONE_STATES = frozenset([
def make_completed_future(result):
"""Make and return a future completed with a given result."""
future = futures.Future()
future = futurist.Future()
future.set_result(result)
return future
@ -47,7 +47,10 @@ def wait_for_any(fs, timeout=None):
Returns pair (done futures, not done futures).
"""
green_fs = sum(1 for f in fs if isinstance(f, futures.GreenFuture))
# TODO(harlowja): remove this when
# https://review.openstack.org/#/c/196269/ is merged and is made
# available.
green_fs = sum(1 for f in fs if isinstance(f, futurist.GreenFuture))
if not green_fs:
return _futures.wait(fs,
timeout=timeout,

View File

@ -15,7 +15,6 @@
# under the License.
import collections
import multiprocessing
import threading
import six
@ -36,17 +35,6 @@ def get_ident():
return _thread.get_ident()
def get_optimal_thread_count():
"""Try to guess optimal thread count for current system."""
try:
return multiprocessing.cpu_count() + 1
except NotImplementedError:
# NOTE(harlowja): apparently may raise so in this case we will
# just setup two threads since it's hard to know what else we
# should do in this situation.
return 2
def daemon_thread(target, *args, **kwargs):
"""Makes a daemon thread that calls the given target when started."""
thread = threading.Thread(target=target, args=args, kwargs=kwargs)