Add a task counter to cluster task scheduler

PYTHON-473

Fixes a TypeError when tasks collide in time and function comparison is attempted
(Python 3 only).
https://docs.python.org/2/library/heapq.html#priority-queue-implementation-notes
This commit is contained in:
Adam Holmberg
2016-02-22 13:38:05 -06:00
parent a98078b5ad
commit 635f554055
2 changed files with 9 additions and 14 deletions

View File

@@ -41,7 +41,7 @@ except ImportError:
from cassandra.util import WeakSet # NOQA from cassandra.util import WeakSet # NOQA
from functools import partial, wraps from functools import partial, wraps
from itertools import groupby from itertools import groupby, count
from cassandra import (ConsistencyLevel, AuthenticationFailed, from cassandra import (ConsistencyLevel, AuthenticationFailed,
OperationTimedOut, UnsupportedOperation, OperationTimedOut, UnsupportedOperation,
@@ -2542,6 +2542,7 @@ class _Scheduler(object):
def __init__(self, executor): def __init__(self, executor):
self._queue = Queue.PriorityQueue() self._queue = Queue.PriorityQueue()
self._scheduled_tasks = set() self._scheduled_tasks = set()
self._count = count()
self._executor = executor self._executor = executor
t = Thread(target=self.run, name="Task Scheduler") t = Thread(target=self.run, name="Task Scheduler")
@@ -2559,7 +2560,7 @@ class _Scheduler(object):
# this can happen on interpreter shutdown # this can happen on interpreter shutdown
pass pass
self.is_shutdown = True self.is_shutdown = True
self._queue.put_nowait((0, None)) self._queue.put_nowait((0, 0, None))
def schedule(self, delay, fn, *args, **kwargs): def schedule(self, delay, fn, *args, **kwargs):
self._insert_task(delay, (fn, args, tuple(kwargs.items()))) self._insert_task(delay, (fn, args, tuple(kwargs.items())))
@@ -2575,7 +2576,7 @@ class _Scheduler(object):
if not self.is_shutdown: if not self.is_shutdown:
run_at = time.time() + delay run_at = time.time() + delay
self._scheduled_tasks.add(task) self._scheduled_tasks.add(task)
self._queue.put_nowait((run_at, task)) self._queue.put_nowait((run_at, next(self._count), task))
else: else:
log.debug("Ignoring scheduled task after shutdown: %r", task) log.debug("Ignoring scheduled task after shutdown: %r", task)
@@ -2586,7 +2587,7 @@ class _Scheduler(object):
try: try:
while True: while True:
run_at, task = self._queue.get(block=True, timeout=None) run_at, i, task = self._queue.get(block=True, timeout=None)
if self.is_shutdown: if self.is_shutdown:
log.debug("Not executing scheduled task due to Scheduler shutdown") log.debug("Not executing scheduled task due to Scheduler shutdown")
return return
@@ -2597,7 +2598,7 @@ class _Scheduler(object):
future = self._executor.submit(fn, *args, **kwargs) future = self._executor.submit(fn, *args, **kwargs)
future.add_done_callback(self._log_if_failed) future.add_done_callback(self._log_if_failed)
else: else:
self._queue.put_nowait((run_at, task)) self._queue.put_nowait((run_at, i, task))
break break
except Queue.Empty: except Queue.Empty:
pass pass

View File

@@ -419,19 +419,13 @@ class TimerTest(unittest.TestCase):
def test_timer_collision(self): def test_timer_collision(self):
# simple test demonstrating #466 # simple test demonstrating #466
def f1():
pass
def f2():
pass
# same timeout, comparison will defer to the Timer object itself # same timeout, comparison will defer to the Timer object itself
t1 = Timer(0, f1) t1 = Timer(0, lambda: None)
t2 = Timer(0, f2) t2 = Timer(0, lambda: None)
t2.end = t1.end t2.end = t1.end
tm = TimerManager() tm = TimerManager()
tm.add_timer(t1) tm.add_timer(t1)
tm.add_timer(t2) tm.add_timer(t2)
# Prior to $466: "TypeError: unorderable types: Timer() < Timer()" # Prior to #466: "TypeError: unorderable types: Timer() < Timer()"
tm.service_timeouts() tm.service_timeouts()