Merge "Introducing NeverAgain functionality for periodics"
This commit is contained in:
commit
68557fec45
@ -256,3 +256,109 @@ Running a set of functions periodically (using an executor)
|
|||||||
:hide:
|
:hide:
|
||||||
|
|
||||||
...
|
...
|
||||||
|
|
||||||
|
--------------------------------------------------------------------
|
||||||
|
Stopping periodic function to run again (using NeverAgain exception)
|
||||||
|
--------------------------------------------------------------------
|
||||||
|
|
||||||
|
.. testcode::
|
||||||
|
|
||||||
|
import futurist
|
||||||
|
from futurist import periodics
|
||||||
|
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
|
||||||
|
|
||||||
|
@periodics.periodic(1)
|
||||||
|
def run_only_once(started_at):
|
||||||
|
print("1: %s" % (time.time() - started_at))
|
||||||
|
raise periodics.NeverAgain("No need to run again after first run !!")
|
||||||
|
|
||||||
|
|
||||||
|
@periodics.periodic(1)
|
||||||
|
def keep_running(started_at):
|
||||||
|
print("2: %s" % (time.time() - started_at))
|
||||||
|
|
||||||
|
|
||||||
|
started_at = time.time()
|
||||||
|
callables = [
|
||||||
|
# The function to run + any automatically provided positional and
|
||||||
|
# keyword arguments to provide to it everytime it is activated.
|
||||||
|
(run_only_once, (started_at,), {}),
|
||||||
|
(keep_running, (started_at,), {}),
|
||||||
|
]
|
||||||
|
w = periodics.PeriodicWorker(callables)
|
||||||
|
|
||||||
|
# In this example we will run the periodic functions using a thread, it
|
||||||
|
# is also possible to just call the w.start() method directly if you do
|
||||||
|
# not mind blocking up the current program.
|
||||||
|
t = threading.Thread(target=w.start)
|
||||||
|
t.daemon = True
|
||||||
|
t.start()
|
||||||
|
|
||||||
|
# Run for 10 seconds and then stop.
|
||||||
|
while (time.time() - started_at) <= 10:
|
||||||
|
time.sleep(0.1)
|
||||||
|
w.stop()
|
||||||
|
w.wait()
|
||||||
|
t.join()
|
||||||
|
|
||||||
|
.. testoutput::
|
||||||
|
:hide:
|
||||||
|
|
||||||
|
...
|
||||||
|
|
||||||
|
-------------------------------------------------------------------
|
||||||
|
Auto stopping the periodic worker when no more periodic work exists
|
||||||
|
-------------------------------------------------------------------
|
||||||
|
|
||||||
|
.. testcode::
|
||||||
|
|
||||||
|
import futurist
|
||||||
|
from futurist import periodics
|
||||||
|
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
|
||||||
|
|
||||||
|
@periodics.periodic(1)
|
||||||
|
def run_only_once(started_at):
|
||||||
|
print("1: %s" % (time.time() - started_at))
|
||||||
|
raise periodics.NeverAgain("No need to run again after first run !!")
|
||||||
|
|
||||||
|
|
||||||
|
@periodics.periodic(2)
|
||||||
|
def run_for_some_time(started_at):
|
||||||
|
print("2: %s" % (time.time() - started_at))
|
||||||
|
if (time.time() - started_at) > 5:
|
||||||
|
raise periodics.NeverAgain("No need to run again !!")
|
||||||
|
|
||||||
|
|
||||||
|
started_at = time.time()
|
||||||
|
callables = [
|
||||||
|
# The function to run + any automatically provided positional and
|
||||||
|
# keyword arguments to provide to it everytime it is activated.
|
||||||
|
(run_only_once, (started_at,), {}),
|
||||||
|
(run_for_some_time, (started_at,), {}),
|
||||||
|
]
|
||||||
|
w = periodics.PeriodicWorker(callables)
|
||||||
|
|
||||||
|
# In this example we will run the periodic functions using a thread, it
|
||||||
|
# is also possible to just call the w.start() method directly if you do
|
||||||
|
# not mind blocking up the current program.
|
||||||
|
t = threading.Thread(target=w.start, kwargs={'auto_stop_when_empty': True})
|
||||||
|
t.daemon = True
|
||||||
|
t.start()
|
||||||
|
|
||||||
|
# Run for 10 seconds and then check to find out that it had
|
||||||
|
# already stooped.
|
||||||
|
while (time.time() - started_at) <= 10:
|
||||||
|
time.sleep(0.1)
|
||||||
|
print(w.pformat())
|
||||||
|
t.join()
|
||||||
|
|
||||||
|
.. testoutput::
|
||||||
|
:hide:
|
||||||
|
|
||||||
|
...
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import contextlib
|
||||||
import inspect
|
import inspect
|
||||||
import multiprocessing
|
import multiprocessing
|
||||||
import sys
|
import sys
|
||||||
@ -85,6 +86,14 @@ class Failure(object):
|
|||||||
finally:
|
finally:
|
||||||
del exc_info
|
del exc_info
|
||||||
|
|
||||||
|
@property
|
||||||
|
def exc_type(self):
|
||||||
|
return self.exc_info[0]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def exc_value(self):
|
||||||
|
return self.exc_info[1]
|
||||||
|
|
||||||
|
|
||||||
def get_callback_name(cb):
|
def get_callback_name(cb):
|
||||||
"""Tries to get a callbacks fully-qualified name.
|
"""Tries to get a callbacks fully-qualified name.
|
||||||
@ -137,11 +146,24 @@ class Barrier(object):
|
|||||||
self._active = 0
|
self._active = 0
|
||||||
self._cond = cond_cls()
|
self._cond = cond_cls()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def active(self):
|
||||||
|
return self._active
|
||||||
|
|
||||||
def incr(self):
|
def incr(self):
|
||||||
with self._cond:
|
with self._cond:
|
||||||
self._active += 1
|
self._active += 1
|
||||||
self._cond.notify_all()
|
self._cond.notify_all()
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def decr_cm(self):
|
||||||
|
with self._cond:
|
||||||
|
self._active -= 1
|
||||||
|
try:
|
||||||
|
yield self._active
|
||||||
|
finally:
|
||||||
|
self._cond.notify_all()
|
||||||
|
|
||||||
def decr(self):
|
def decr(self):
|
||||||
with self._cond:
|
with self._cond:
|
||||||
self._active -= 1
|
self._active -= 1
|
||||||
|
@ -33,11 +33,21 @@ from futurist import _utils as utils
|
|||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class NeverAgain(Exception):
|
||||||
|
"""Exception to raise to stop further periodic calls for a function.
|
||||||
|
|
||||||
|
When you want a function never run again you can throw this from
|
||||||
|
you periodic function and that will signify to the execution framework
|
||||||
|
to remove that function (and never run it again).
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
_REQUIRED_ATTRS = ('_is_periodic', '_periodic_spacing',
|
_REQUIRED_ATTRS = ('_is_periodic', '_periodic_spacing',
|
||||||
'_periodic_run_immediately')
|
'_periodic_run_immediately')
|
||||||
|
|
||||||
_DEFAULT_COLS = ('Name', 'Active', 'Periodicity', 'Runs in',
|
_DEFAULT_COLS = ('Name', 'Active', 'Periodicity', 'Runs in',
|
||||||
'Runs', 'Failures', 'Successes',
|
'Runs', 'Failures', 'Successes', 'Stop Requested',
|
||||||
'Average elapsed', 'Average elapsed waiting')
|
'Average elapsed', 'Average elapsed waiting')
|
||||||
|
|
||||||
# Constants that are used to determine what 'kind' the current callback
|
# Constants that are used to determine what 'kind' the current callback
|
||||||
@ -67,6 +77,11 @@ class Watcher(object):
|
|||||||
work=self._work,
|
work=self._work,
|
||||||
metrics=self._metrics)
|
metrics=self._metrics)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def requested_stop(self):
|
||||||
|
"""If the work unit being ran has requested to be stopped."""
|
||||||
|
return self._metrics['requested_stop']
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def work(self):
|
def work(self):
|
||||||
"""**Read-only** named work tuple this object watches."""
|
"""**Read-only** named work tuple this object watches."""
|
||||||
@ -343,6 +358,7 @@ class PeriodicWorker(object):
|
|||||||
'elapsed_waiting': 0,
|
'elapsed_waiting': 0,
|
||||||
'failures': 0,
|
'failures': 0,
|
||||||
'successes': 0,
|
'successes': 0,
|
||||||
|
'requested_stop': False,
|
||||||
}
|
}
|
||||||
|
|
||||||
# When scheduling fails temporary, use a random delay between 0.9-1.1 sec.
|
# When scheduling fails temporary, use a random delay between 0.9-1.1 sec.
|
||||||
@ -591,7 +607,7 @@ class PeriodicWorker(object):
|
|||||||
"""How many callables/periodic work units are currently active."""
|
"""How many callables/periodic work units are currently active."""
|
||||||
return len(self._works)
|
return len(self._works)
|
||||||
|
|
||||||
def _run(self, executor, runner):
|
def _run(self, executor, runner, auto_stop_when_empty):
|
||||||
"""Main worker run loop."""
|
"""Main worker run loop."""
|
||||||
barrier = utils.Barrier(cond_cls=self._cond_cls)
|
barrier = utils.Barrier(cond_cls=self._cond_cls)
|
||||||
rnd = random.SystemRandom()
|
rnd = random.SystemRandom()
|
||||||
@ -639,7 +655,6 @@ class PeriodicWorker(object):
|
|||||||
PERIODIC,
|
PERIODIC,
|
||||||
work, index,
|
work, index,
|
||||||
submitted_at))
|
submitted_at))
|
||||||
fut.add_done_callback(lambda _fut: barrier.decr())
|
|
||||||
else:
|
else:
|
||||||
# Gotta wait...
|
# Gotta wait...
|
||||||
self._schedule.push(next_run, index)
|
self._schedule.push(next_run, index)
|
||||||
@ -647,6 +662,7 @@ class PeriodicWorker(object):
|
|||||||
self._waiter.wait(when_next)
|
self._waiter.wait(when_next)
|
||||||
|
|
||||||
def _process_immediates():
|
def _process_immediates():
|
||||||
|
with self._waiter:
|
||||||
try:
|
try:
|
||||||
index = self._immediates.popleft()
|
index = self._immediates.popleft()
|
||||||
except IndexError:
|
except IndexError:
|
||||||
@ -670,14 +686,15 @@ class PeriodicWorker(object):
|
|||||||
IMMEDIATE,
|
IMMEDIATE,
|
||||||
work, index,
|
work, index,
|
||||||
submitted_at))
|
submitted_at))
|
||||||
fut.add_done_callback(lambda _fut: barrier.decr())
|
|
||||||
|
|
||||||
def _on_done(kind, work, index, submitted_at, fut):
|
def _on_done(kind, work, index, submitted_at, fut):
|
||||||
cb = work.callback
|
cb = work.callback
|
||||||
started_at, finished_at, failure = fut.result()
|
started_at, finished_at, failure = fut.result()
|
||||||
cb_metrics, _watcher = self._watchers[index]
|
cb_metrics, _watcher = self._watchers[index]
|
||||||
cb_metrics['runs'] += 1
|
cb_metrics['runs'] += 1
|
||||||
|
schedule_again = True
|
||||||
if failure is not None:
|
if failure is not None:
|
||||||
|
if not issubclass(failure.exc_type, NeverAgain):
|
||||||
cb_metrics['failures'] += 1
|
cb_metrics['failures'] += 1
|
||||||
try:
|
try:
|
||||||
self._on_failure(cb, kind, cb._periodic_spacing,
|
self._on_failure(cb, kind, cb._periodic_spacing,
|
||||||
@ -687,16 +704,34 @@ class PeriodicWorker(object):
|
|||||||
self._log.error("On failure callback %r raised an"
|
self._log.error("On failure callback %r raised an"
|
||||||
" unhandled exception. Error: %s",
|
" unhandled exception. Error: %s",
|
||||||
self._on_failure, exc)
|
self._on_failure, exc)
|
||||||
|
else:
|
||||||
|
cb_metrics['successes'] += 1
|
||||||
|
schedule_again = False
|
||||||
|
self._log.debug("Periodic callback '%s' raised "
|
||||||
|
"'NeverAgain' "
|
||||||
|
"exception, stopping any further "
|
||||||
|
"execution of it.", work.name)
|
||||||
else:
|
else:
|
||||||
cb_metrics['successes'] += 1
|
cb_metrics['successes'] += 1
|
||||||
elapsed = max(0, finished_at - started_at)
|
elapsed = max(0, finished_at - started_at)
|
||||||
elapsed_waiting = max(0, started_at - submitted_at)
|
elapsed_waiting = max(0, started_at - submitted_at)
|
||||||
cb_metrics['elapsed'] += elapsed
|
cb_metrics['elapsed'] += elapsed
|
||||||
cb_metrics['elapsed_waiting'] += elapsed_waiting
|
cb_metrics['elapsed_waiting'] += elapsed_waiting
|
||||||
next_run = self._schedule_strategy(cb, started_at, finished_at,
|
|
||||||
cb_metrics)
|
|
||||||
with self._waiter:
|
with self._waiter:
|
||||||
|
with barrier.decr_cm() as am_left:
|
||||||
|
if schedule_again:
|
||||||
|
next_run = self._schedule_strategy(cb, started_at,
|
||||||
|
finished_at,
|
||||||
|
cb_metrics)
|
||||||
self._schedule.push(next_run, index)
|
self._schedule.push(next_run, index)
|
||||||
|
else:
|
||||||
|
cb_metrics['requested_stop'] = True
|
||||||
|
if (am_left <= 0 and
|
||||||
|
len(self._immediates) == 0 and
|
||||||
|
len(self._schedule) == 0 and
|
||||||
|
auto_stop_when_empty):
|
||||||
|
# Guess nothing left to do, goodbye...
|
||||||
|
self._tombstone.set()
|
||||||
self._waiter.notify_all()
|
self._waiter.notify_all()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -733,7 +768,10 @@ class PeriodicWorker(object):
|
|||||||
for index, work in enumerate(self._works):
|
for index, work in enumerate(self._works):
|
||||||
_cb_metrics, watcher = self._watchers[index]
|
_cb_metrics, watcher = self._watchers[index]
|
||||||
next_run = self._schedule.fetch_next_run(index)
|
next_run = self._schedule.fetch_next_run(index)
|
||||||
if next_run is None:
|
if watcher.requested_stop:
|
||||||
|
active = False
|
||||||
|
runs_in = 'n/a'
|
||||||
|
elif next_run is None:
|
||||||
active = True
|
active = True
|
||||||
runs_in = 'n/a'
|
runs_in = 'n/a'
|
||||||
else:
|
else:
|
||||||
@ -747,6 +785,7 @@ class PeriodicWorker(object):
|
|||||||
'Runs in': runs_in,
|
'Runs in': runs_in,
|
||||||
'Failures': watcher.failures,
|
'Failures': watcher.failures,
|
||||||
'Successes': watcher.successes,
|
'Successes': watcher.successes,
|
||||||
|
'Stop Requested': watcher.requested_stop,
|
||||||
}
|
}
|
||||||
try:
|
try:
|
||||||
cb_row_avgs = [
|
cb_row_avgs = [
|
||||||
@ -803,7 +842,7 @@ class PeriodicWorker(object):
|
|||||||
self._waiter.notify_all()
|
self._waiter.notify_all()
|
||||||
return watcher
|
return watcher
|
||||||
|
|
||||||
def start(self, allow_empty=False):
|
def start(self, allow_empty=False, auto_stop_when_empty=False):
|
||||||
"""Starts running (will not return until :py:meth:`.stop` is called).
|
"""Starts running (will not return until :py:meth:`.stop` is called).
|
||||||
|
|
||||||
:param allow_empty: instead of running with no callbacks raise when
|
:param allow_empty: instead of running with no callbacks raise when
|
||||||
@ -814,6 +853,16 @@ class PeriodicWorker(object):
|
|||||||
sleep (until either stopped or callbacks are
|
sleep (until either stopped or callbacks are
|
||||||
added)
|
added)
|
||||||
:type allow_empty: bool
|
:type allow_empty: bool
|
||||||
|
:param auto_stop_when_empty: when the provided periodic functions have
|
||||||
|
all exited and this is false then the
|
||||||
|
thread responsible for executing those
|
||||||
|
methods will just spin/idle waiting for
|
||||||
|
a new periodic function to be added;
|
||||||
|
switching it to true will make this
|
||||||
|
idling not happen (and instead when no
|
||||||
|
more periodic work exists then the
|
||||||
|
calling thread will just return).
|
||||||
|
:type auto_stop_when_empty: bool
|
||||||
"""
|
"""
|
||||||
if not self._works and not allow_empty:
|
if not self._works and not allow_empty:
|
||||||
raise RuntimeError("A periodic worker can not start"
|
raise RuntimeError("A periodic worker can not start"
|
||||||
@ -836,7 +885,7 @@ class PeriodicWorker(object):
|
|||||||
self._dead.clear()
|
self._dead.clear()
|
||||||
self._active.set()
|
self._active.set()
|
||||||
try:
|
try:
|
||||||
self._run(executor, runner)
|
self._run(executor, runner, auto_stop_when_empty)
|
||||||
finally:
|
finally:
|
||||||
if getattr(self._executor_factory, 'shutdown', True):
|
if getattr(self._executor_factory, 'shutdown', True):
|
||||||
executor.shutdown()
|
executor.shutdown()
|
||||||
|
@ -434,6 +434,83 @@ class TestPeriodics(testscenarios.TestWithScenarios, base.TestCase):
|
|||||||
|
|
||||||
m.assert_called_with('foo', bar='baz')
|
m.assert_called_with('foo', bar='baz')
|
||||||
|
|
||||||
|
def test_never_again_exc(self):
|
||||||
|
|
||||||
|
m_1 = mock.MagicMock()
|
||||||
|
m_2 = mock.MagicMock()
|
||||||
|
|
||||||
|
@periodics.periodic(0.5)
|
||||||
|
def run_only_once():
|
||||||
|
m_1()
|
||||||
|
raise periodics.NeverAgain("No need to run again !!")
|
||||||
|
|
||||||
|
@periodics.periodic(0.5)
|
||||||
|
def keep_running():
|
||||||
|
m_2()
|
||||||
|
|
||||||
|
callables = [
|
||||||
|
(run_only_once, None, None),
|
||||||
|
(keep_running, None, None),
|
||||||
|
]
|
||||||
|
executor_factory = lambda: self.executor_cls(**self.executor_kwargs)
|
||||||
|
w = periodics.PeriodicWorker(callables,
|
||||||
|
executor_factory=executor_factory,
|
||||||
|
**self.worker_kwargs)
|
||||||
|
with self.create_destroy(w.start):
|
||||||
|
self.sleep(2.0)
|
||||||
|
w.stop()
|
||||||
|
|
||||||
|
for watcher in w.iter_watchers():
|
||||||
|
self.assertGreaterEqual(watcher.runs, 1)
|
||||||
|
self.assertGreaterEqual(watcher.successes, 1)
|
||||||
|
self.assertEqual(watcher.failures, 0)
|
||||||
|
|
||||||
|
self.assertEqual(m_1.call_count, 1)
|
||||||
|
self.assertGreaterEqual(m_2.call_count, 3)
|
||||||
|
|
||||||
|
def test_start_with_auto_stop_when_empty_set(self):
|
||||||
|
|
||||||
|
@periodics.periodic(0.5)
|
||||||
|
def run_only_once():
|
||||||
|
raise periodics.NeverAgain("No need to run again !!")
|
||||||
|
|
||||||
|
callables = [
|
||||||
|
(run_only_once, None, None),
|
||||||
|
(run_only_once, None, None),
|
||||||
|
]
|
||||||
|
executor_factory = lambda: self.executor_cls(**self.executor_kwargs)
|
||||||
|
w = periodics.PeriodicWorker(callables,
|
||||||
|
executor_factory=executor_factory,
|
||||||
|
**self.worker_kwargs)
|
||||||
|
with self.create_destroy(w.start, auto_stop_when_empty=True):
|
||||||
|
self.sleep(2.0)
|
||||||
|
|
||||||
|
for watcher in w.iter_watchers():
|
||||||
|
self.assertGreaterEqual(watcher.runs, 1)
|
||||||
|
self.assertGreaterEqual(watcher.successes, 1)
|
||||||
|
self.assertEqual(watcher.failures, 0)
|
||||||
|
self.assertEqual(watcher.requested_stop, True)
|
||||||
|
|
||||||
|
def test_add_with_auto_stop_when_empty_set(self):
|
||||||
|
m = mock.Mock()
|
||||||
|
|
||||||
|
@periodics.periodic(0.5)
|
||||||
|
def run_only_once():
|
||||||
|
raise periodics.NeverAgain("No need to run again !!")
|
||||||
|
|
||||||
|
callables = [
|
||||||
|
(run_only_once, None, None),
|
||||||
|
]
|
||||||
|
executor_factory = lambda: self.executor_cls(**self.executor_kwargs)
|
||||||
|
w = periodics.PeriodicWorker(callables,
|
||||||
|
executor_factory=executor_factory,
|
||||||
|
**self.worker_kwargs)
|
||||||
|
with self.create_destroy(w.start, auto_stop_when_empty=True):
|
||||||
|
self.sleep(2.0)
|
||||||
|
w.add(every_half_sec, m, None)
|
||||||
|
|
||||||
|
m.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
class RejectingExecutor(futurist.GreenThreadPoolExecutor):
|
class RejectingExecutor(futurist.GreenThreadPoolExecutor):
|
||||||
MAX_REJECTIONS_COUNT = 2
|
MAX_REJECTIONS_COUNT = 2
|
||||||
|
Loading…
Reference in New Issue
Block a user